geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From r..@apache.org
Subject [34/51] [partial] incubator-geode git commit: WAN and CQ code drop under the Pivotal SGA
Date Wed, 20 Jan 2016 02:22:40 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/6df75241/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java
new file mode 100644
index 0000000..52c454f
--- /dev/null
+++ b/gemfire-cq/src/test/java/com/gemstone/gemfire/internal/cache/PutAllCSDUnitTest.java
@@ -0,0 +1,4408 @@
+/*=========================================================================
+ * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved.
+ * This product is protected by U.S. and international copyright
+ * and intellectual property laws. Pivotal products are covered by
+ * one or more patents listed at http://www.pivotal.io/patents.
+ *=========================================================================
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import com.gemstone.gemfire.DataSerializable;
+import com.gemstone.gemfire.DataSerializer;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheException;
+import com.gemstone.gemfire.cache.CacheWriterException;
+import com.gemstone.gemfire.cache.CommitConflictException;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.Declarable;
+import com.gemstone.gemfire.cache.DiskStore;
+import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.InterestResultPolicy;
+import com.gemstone.gemfire.cache.Operation;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.Region.Entry;
+import com.gemstone.gemfire.cache.RegionDestroyedException;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.cache.client.ClientCache;
+import com.gemstone.gemfire.cache.client.ClientCacheFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionFactory;
+import com.gemstone.gemfire.cache.client.ClientRegionShortcut;
+import com.gemstone.gemfire.cache.client.PoolFactory;
+import com.gemstone.gemfire.cache.client.PoolManager;
+import com.gemstone.gemfire.cache.client.ServerConnectivityException;
+import com.gemstone.gemfire.cache.client.ServerOperationException;
+import com.gemstone.gemfire.cache.persistence.PartitionOfflineException;
+import com.gemstone.gemfire.cache.query.CqAttributes;
+import com.gemstone.gemfire.cache.query.CqAttributesFactory;
+import com.gemstone.gemfire.cache.query.CqClosedException;
+import com.gemstone.gemfire.cache.query.CqEvent;
+import com.gemstone.gemfire.cache.query.CqException;
+import com.gemstone.gemfire.cache.query.CqExistsException;
+import com.gemstone.gemfire.cache.query.CqListener;
+import com.gemstone.gemfire.cache.query.CqQuery;
+import com.gemstone.gemfire.cache.query.QueryInvalidException;
+import com.gemstone.gemfire.cache.query.QueryService;
+import com.gemstone.gemfire.cache.query.RegionNotFoundException;
+import com.gemstone.gemfire.cache.query.SelectResults;
+import com.gemstone.gemfire.cache.query.Struct;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.server.CacheServer;
+import com.gemstone.gemfire.cache.util.CacheListenerAdapter;
+import com.gemstone.gemfire.cache.util.CacheWriterAdapter;
+import com.gemstone.gemfire.cache30.ClientServerTestCase;
+import com.gemstone.gemfire.cache30.CacheSerializableRunnable;
+import com.gemstone.gemfire.distributed.internal.DistributionConfig;
+import com.gemstone.gemfire.internal.AvailablePort;
+import com.gemstone.gemfire.internal.AvailablePortHelper;
+import com.gemstone.gemfire.internal.cache.LocalRegion.NonTXEntry;
+import com.gemstone.gemfire.internal.cache.versions.VersionTag;
+import com.gemstone.gemfire.internal.i18n.LocalizedStrings;
+
+import dunit.AsyncInvocation;
+import dunit.DistributedTestCase;
+import dunit.Host;
+import dunit.SerializableCallable;
+import dunit.SerializableRunnable;
+import dunit.VM;
+
+/**
+ * Tests putAll for c/s. Also tests removeAll
+ * 
+ * @author Gester Zhou
+ * @since 5.0.23
+ */
+public class PutAllCSDUnitTest extends ClientServerTestCase {
+
+  final int numberOfEntries = 100;
+  
+  final int testEndPointSwitchNumber = 200;
+  
+  final int thousandEntries = 1000;
+  
+  final int TOTAL_BUCKETS = 10;
+
+  static Object lockObject = new Object();
+
+  static Object lockObject2 = new Object();
+  
+  static Object lockObject3 = new Object();
+  
+  static Object lockObject4 = new Object();
+  
+  final String expectedExceptions = PutAllPartialResultException.class.getName()+"||"
+  + ServerConnectivityException.class.getName()+"||"+RegionDestroyedException.class.getName()+"||java.net.ConnectException";
+
+  // public static void caseTearDown() throws Exception {
+  // disconnectAllFromDS();
+  // }
+
+  /**
+   * Creates a new <code>GemFireMemberStatusDUnitTest</code>
+   */
+  public PutAllCSDUnitTest(String name) {
+    super(name);
+  }
+
+  // ////// Test Methods
+
+  private static void checkRegionSize(final Region region, final int expectedSize) {
+    WaitCriterion ev = new WaitCriterion() {
+      public boolean done() {
+        return region.size() == expectedSize; 
+      }
+      public String description() {
+        return null;
+      }
+    };
+    DistributedTestCase.waitForCriterion(ev, 10 * 1000, 1000, true);
+    assertEquals(expectedSize, region.size());
+  }
+  
+  /**
+   * Tests putAll to one server.
+   * 
+   * @throws InterruptedException
+   */
+public void testOneServer() throws CacheException, InterruptedException {
+    final String title = "testOneServer:";
+    final Host host = Host.getHost(0);
+    VM server = host.getVM(0);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+    final int serverPort = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server.getHost());
+
+    // set <false, true> means <PR=false, notifyBySubscription=true> to enable registerInterest and CQ
+    createBridgeServer(server, regionName, serverPort, false, 0, null);
+    createClient(client1, regionName, serverHost, new int[] {serverPort}, -1, -1, false, true, true);
+    createClient(client2, regionName, serverHost, new int[] {serverPort}, -1, -1, false, true, true);
+
+    server.invoke(new CacheSerializableRunnable(title+"server add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 registerInterest and add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        
+        // registerInterest for ALL_KEYS
+        region.registerInterest("ALL_KEYS");
+        getLogWriter().info("client2 registerInterest ALL_KEYS at "+region.getFullPath());
+      }
+    });
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 create local region and run putAll") {
+      public void run2() throws CacheException {
+        AttributesFactory factory2 = new AttributesFactory();
+        factory2.setScope(Scope.LOCAL);
+        factory2.addCacheListener(new MyListener(false));
+        createRegion("localsave", factory2.create());
+        
+        Region region = doPutAll(regionName, "key-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+      }
+    });   
+        
+    AsyncInvocation async1 = client1.invokeAsync(new CacheSerializableRunnable(title+"client1 create CQ") {
+      public void run2() throws CacheException {
+        // create a CQ for key 10-20
+ 	Region localregion = getRootRegion().getSubregion("localsave");
+        CqAttributesFactory cqf1 = new CqAttributesFactory();
+        EOCQEventListener EOCQListener = new EOCQEventListener(localregion);
+        cqf1.addCqListener(EOCQListener);
+        CqAttributes cqa1 = cqf1.create();
+        String cqName1 = "EOInfoTracker";
+        String queryStr1 = "SELECT ALL * FROM /root/"+regionName+" ii WHERE ii.getTicker() >= '10' and ii.getTicker() < '20'";
+        getLogWriter().info("Query String: "+queryStr1);
+        try {
+          QueryService cqService = getCache().getQueryService();
+          CqQuery EOTracker = cqService.newCq(cqName1, queryStr1, cqa1);
+          SelectResults rs1 = EOTracker.executeWithInitialResults();
+            
+          List list1 = rs1.asList();
+          for (int i=0; i<list1.size(); i++) {
+            Struct s = (Struct)list1.get(i);
+            TestObject o = (TestObject)s.get("value");
+            getLogWriter().info("InitialResult:"+i+":"+o);
+            localregion.put("key-"+i, o);
+          }
+          if (localregion.size() > 0) {
+            getLogWriter().info("CQ is ready");
+            synchronized(lockObject) {
+              lockObject.notify();
+            }
+          }
+
+          waitTillNotify(lockObject2, 20000, (EOCQListener.num_creates == 5 && EOCQListener.num_updates == 5));
+          EOTracker.close();
+        }
+        catch (CqClosedException e) {
+          fail("CQ", e);
+        }
+        catch (RegionNotFoundException e) {
+          fail("CQ", e);
+        }
+        catch (QueryInvalidException e) {
+          fail("CQ", e);
+        }
+        catch (CqExistsException e) {
+          fail("CQ", e);
+        }
+        catch (CqException e) {
+          fail("CQ", e);
+        }
+      }
+    });
+    
+    server.invoke(new CacheSerializableRunnable(title+"verify Bridge Server") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries, region.size());
+        for (int i=0; i<numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-"+i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });
+
+    // verify CQ is ready
+    client1.invoke(new CacheSerializableRunnable(title+"verify CQ is ready") {
+      public void run2() throws CacheException {
+        Region localregion = getRootRegion().getSubregion("localsave");
+        waitTillNotify(lockObject, 10000, (localregion.size()>0));
+        assertTrue(localregion.size()>0);
+      }
+    });
+
+    // verify registerInterest result at client2
+    client2.invoke(new CacheSerializableRunnable(title+"verify client2") {
+      public void run2() throws CacheException {
+        final Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries);
+        for (int i=0; i<numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-"+i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+        
+        // then do update for key 10-20 to trigger CQ at server2
+        // destroy key 10-14 to simulate create/update mix case
+        region.removeAll(Arrays.asList("key-10", "key-11", "key-12", "key-13", "key-14"), "removeAllCallback");
+        assertEquals(null, region.get("key-10"));
+        assertEquals(null, region.get("key-11"));
+        assertEquals(null, region.get("key-12"));
+        assertEquals(null, region.get("key-13"));
+        assertEquals(null, region.get("key-14"));
+      }
+    });
+    
+    // verify CQ result at client1
+    client1.invoke(new CacheSerializableRunnable(title+"Verify client1") {
+      public void run2() throws CacheException {
+        Region localregion = getRootRegion().getSubregion("localsave");
+        for (int i=10; i<15; i++) {
+          TestObject obj = null; 
+          int cnt = 0;
+          while (cnt < 100) {
+            obj = (TestObject)localregion.get("key-"+i);
+            if (obj != null) {
+              // wait for the key to be destroyed
+              pause(100);
+              if (getLogWriter().fineEnabled()) {
+                getLogWriter().info("Waiting 100ms("+cnt+") for key-" + i + " to be destroyed");
+              }
+              cnt++;
+            } else {
+              break;
+            }
+          }
+        }
+      }
+    });
+    client2.invoke(new CacheSerializableRunnable(title+"verify client2") {
+      public void run2() throws CacheException {
+        final Region region = getRootRegion().getSubregion(regionName);
+        LinkedHashMap map = new LinkedHashMap();
+        for (int i=10; i<20; i++) {
+          map.put("key-"+i, new TestObject(i*10));
+        }
+        region.putAll(map, "putAllCallback");
+      }
+    });
+    // verify CQ result at client1
+    client1.invoke(new CacheSerializableRunnable(title+"Verify client1") {
+      public void run2() throws CacheException {
+        Region localregion = getRootRegion().getSubregion("localsave");
+        for (int i=10; i<20; i++) {
+          TestObject obj = null; 
+          int cnt = 0;
+          while (cnt < 100) {
+            obj = (TestObject)localregion.get("key-"+i);
+            if (obj == null || obj.getPrice() != i*10) {
+              pause(100);
+              getLogWriter().info("Waiting 100ms("+cnt+") for obj.getPrice() == i*10 at entry "+i);
+              cnt++;
+            } else {
+              break;
+            }
+          }
+          assertEquals(i*10, obj.getPrice());
+        }
+        synchronized(lockObject2) {
+          lockObject2.notify();
+        }
+      }
+    });
+
+    DistributedTestCase.join(async1, 30 * 1000, getLogWriter());
+    
+    // verify stats for client putAll into distributed region
+    // 1. verify client staus
+    /*
+     * server2.invoke(new CacheSerializableRunnable("server2 execute putAll") {
+     * public void run2() throws CacheException { try { DistributedSystemConfig
+     * config = AdminDistributedSystemFactory.defineDistributedSystem(system,
+     * null); AdminDistributedSystem ads =
+     * AdminDistributedSystemFactory.getDistributedSystem(config);
+     * ads.connect(); DistributedMember distributedMember =
+     * system.getDistributedMember(); SystemMember member =
+     * ads.lookupSystemMember(distributedMember);
+     * 
+     * StatisticResource[] resources = member.getStats(); for (int i=0; i<resources.length;
+     * i++) { System.out.println("GGG:"+resources[i].getType()); if
+     * (resources[i].getType().equals("CacheServerClientStats")) { Statistic[]
+     * stats = resources[i].getStatistics(); for (int j=0; i<stats.length; i++) {
+     * if (stats[j].getName().equals("putAll")) {
+     * System.out.println("GGG:"+stats[j].getName()+":"+stats[j].getValue()); }
+     * else if (stats[j].getName().equals("sendPutAllTime")) {
+     * System.out.println("GGG:"+stats[j].getName()+":"+stats[j].getValue()); } } } } }
+     * catch (AdminException e) {
+     * fail("Failed while creating AdminDS", e); } } });
+     */
+
+    // Test Exception handling
+    // verify CQ is ready
+    client1.invoke(new CacheSerializableRunnable(title+"test exception handling") {
+      public void run2() throws CacheException {
+    	Region region = getRootRegion().getSubregion(regionName);
+        Map m = null;
+        boolean NPEthrowed = false;
+        try{
+          region.putAll(m, "putAllCallback");
+          fail("Should have thrown NullPointerException");
+        }
+        catch (NullPointerException ex) {
+          NPEthrowed = true;
+        }
+        assertTrue(NPEthrowed);
+
+        region.localDestroyRegion();
+        boolean RDEthrowed = false;
+        try{
+          m = new HashMap();
+          for(int i=1; i<21; i++) {
+            m.put(new Integer(i), Integer.toString(i));
+          }
+          region.putAll(m, "putAllCallback");
+          fail("Should have thrown RegionDestroyedException");
+        }
+        catch (RegionDestroyedException ex){
+          RDEthrowed = true;
+        }
+        assertTrue(RDEthrowed);
+        try{
+          region.removeAll(Arrays.asList("key-10", "key-11"), "removeAllCallback");
+          fail("Should have thrown RegionDestroyedException");
+        }
+        catch (RegionDestroyedException expected) {
+        }
+      }
+    });
+    
+    // Stop server
+    stopBridgeServers(getCache());
+  }
+
+  /**
+   * Tests putAll afterUpdate event contained oldValue.
+   * 
+   * @throws InterruptedException
+   */
+  public void testOldValueInEvent() throws CacheException, InterruptedException {
+    final String title = "testOldValueInEvent:";
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+    
+    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server1.getHost());
+  
+    // set notifyBySubscription=false to test local-invalidates
+    createBridgeServer(server1, regionName, serverPort1, false, 0, null);
+    createBridgeServer(server2, regionName, serverPort2, false, 0, null);
+    createClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, false, true, true);
+    createClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, -1, false, true, true);
+  
+    client2.invoke(new CacheSerializableRunnable(title+"client2 registerInterest and add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        
+        // registerInterest for ALL_KEYS
+        region.registerInterest("ALL_KEYS");
+        getLogWriter().info("client2 registerInterest ALL_KEYS at "+region.getFullPath());
+      }
+    });
+  
+    client1.invoke(new CacheSerializableRunnable(title+"client1 create local region and run putAll") {
+      public void run2() throws CacheException {
+        // create keys
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        doPutAll(regionName, title, numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+  
+        // update keys
+        doPutAll(regionName, title, numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+      }
+    });
+    
+    // verify 41890, the local PUTALL_UPDATE event should contain old value
+    client1.invoke(new CacheSerializableRunnable(title+"verify after update events") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        waitTillNotify(lockObject, 20000, (region.size() == numberOfEntries));
+      }
+    });
+        
+    client2.invoke(new CacheSerializableRunnable(title+"verify after update events") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        waitTillNotify(lockObject, 20000, (region.size() == numberOfEntries));
+      }
+    });
+    
+    // Stop server
+    stopBridgeServers(getCache());
+  }
+
+  /**
+   * Tests putAll and removeAll to 2 servers. Use Case: 1) putAll from a single-threaded
+   * client to a replicated region 2) putAll from a multi-threaded client to a
+   * replicated region 3)
+   */
+  public void test2Server() throws CacheException, InterruptedException {
+    final String title = "test2Server:";
+//    disconnectAllFromDS();
+
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+    
+    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server1.getHost());
+
+    // set notifyBySubscription=false to test local-invalidates
+    createBridgeServer(server1, regionName, serverPort1, false, 0, null);
+    createBridgeServer(server2, regionName, serverPort2, false, 0, null);
+    createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, true);
+    createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, -1, true);
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+      }
+    });   
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 add listener and putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        doPutAll(regionName, "key-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().setCacheWriter(new MyWriter("key-"));
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });
+
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().setCacheWriter(new MyWriter("key-"));
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries);
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("key-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+      }
+    });   
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        doRemoveAll(regionName, "key-", numberOfEntries);
+        assertEquals(0, region.size());
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify removeAll Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+        MyWriter mywriter = (MyWriter)region.getAttributes().getCacheWriter();
+        getLogWriter().info("server cachewriter triggered for destroy: "+mywriter.num_destroyed);
+        assertEquals(numberOfEntries, mywriter.num_destroyed);
+      }
+    });
+
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify removeAll Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+        MyWriter mywriter = (MyWriter)region.getAttributes().getCacheWriter();
+        getLogWriter().info("server cachewriter triggered for destroy: "+mywriter.num_destroyed);
+        // beforeDestroys are only triggered at server1 since the removeAll is submitted from client1
+        assertEquals(0, mywriter.num_destroyed);
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });   
+
+    {
+    // Execute client putAll from multithread client
+    AsyncInvocation async1 = client1.invokeAsync(new CacheSerializableRunnable(
+        title + "async putAll1 from client1") {
+      public void run2() throws CacheException {
+        doPutAll(regionName, "async1key-", numberOfEntries);
+      }
+    });
+    AsyncInvocation async2 = client1.invokeAsync(new CacheSerializableRunnable(
+        title + "async putAll2 from client1") {
+      public void run2() throws CacheException {
+        doPutAll(regionName, "async2key-", numberOfEntries);
+      }
+    });
+
+    DistributedTestCase.join(async1, 30 * 1000, getLogWriter());
+    DistributedTestCase.join(async2, 30 * 1000, getLogWriter());
+    }
+
+    client1.invoke(new CacheSerializableRunnable(title
+        + "verify client 1 for async keys") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries*2, region.size());
+        long ts1 = 0, ts2 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("async1key-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+
+          obj = (TestObject)region.getEntry("async2key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts2);
+          ts2 = obj.getTS();
+        }
+      }
+    });
+
+    // verify bridge server 1 for asyn keys
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 1 for async keys") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries*2, region.size());
+        long ts1 = 0, ts2 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("async1key-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+
+          obj = (TestObject)region.getEntry("async2key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts2);
+          ts2 = obj.getTS();
+        }
+      }
+    });
+    // verify bridge server 2 for asyn keys
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2 for async keys") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries*2, region.size());
+        long ts1 = 0, ts2 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("async1key-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+
+          obj = (TestObject)region.getEntry("async2key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts2);
+          ts2 = obj.getTS();
+        }
+      }
+    });
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify async putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries*2);
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("async1key-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("async2key-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+      }
+    });   
+
+    {
+      // Execute client removeAll from multithread client
+      AsyncInvocation async1 = client1.invokeAsync(new CacheSerializableRunnable(
+          title + "async putAll1 from client1") {
+        public void run2() throws CacheException {
+          doRemoveAll(regionName, "async1key-", numberOfEntries);
+        }
+      });
+      AsyncInvocation async2 = client1.invokeAsync(new CacheSerializableRunnable(
+          title + "async putAll2 from client1") {
+        public void run2() throws CacheException {
+          doRemoveAll(regionName, "async2key-", numberOfEntries);
+        }
+      });
+
+      DistributedTestCase.join(async1, 30 * 1000, getLogWriter());
+      DistributedTestCase.join(async2, 30 * 1000, getLogWriter());
+    }
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        doRemoveAll(regionName, "key-", numberOfEntries);
+        assertEquals(0, region.size());
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify async removeAll Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify async removeAll Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify async removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });
+    
+    // Execute p2p putAll
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 execute P2P putAll") {
+      public void run2() throws CacheException {
+        doPutAll(regionName, "p2pkey-", numberOfEntries);
+        Region region = getRootRegion().getSubregion(regionName);
+        long ts1 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("p2pkey-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+        }
+      }
+    });
+
+    // verify bridge server 2 for p2p keys
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2 for p2p keys") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        long ts1 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("p2pkey-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+        }
+      }
+    });
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify p2p putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries);
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("p2pkey-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+      }
+    });   
+    client1.invoke(new CacheSerializableRunnable(title+"client1 verify p2p putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries);
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("p2pkey-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+      }
+    });   
+
+    // Execute p2p removeAll
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 execute P2P removeAll") {
+      public void run2() throws CacheException {
+        doRemoveAll(regionName, "p2pkey-", numberOfEntries);
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify p2p removeAll Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify p2p removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 verify p2p removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });
+
+    // putAll at client2 to trigger local-invalidates at client1
+    client2.invoke(new CacheSerializableRunnable(title
+        + "execute putAll on client2 for key 0-10") {
+      public void run2() throws CacheException {
+        doPutAll(regionName, "key-", 10);
+      }
+    });
+
+    // verify client 2 for key 0-10
+    client1.invoke(new CacheSerializableRunnable(title
+        + "verify client1 for local invalidate") {
+      public void run2() throws CacheException {
+        final Region region = getRootRegion().getSubregion(regionName);
+        for (int i = 0; i < 10; i++) {
+          final int ii = i;
+          WaitCriterion ev = new WaitCriterion() {
+            public boolean done() {
+              Entry entry = region.getEntry("key-" + ii);
+              return entry != null  &&  entry.getValue() == null;
+            }
+            public String description() {
+              return null;
+            }
+          };
+          DistributedTestCase.waitForCriterion(ev, 10 * 1000, 1000, true);
+          // local invalidate will set the value to null
+          TestObject obj = null;
+          obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(null, obj);
+        }
+      }
+    });
+
+    // clean up
+    // Stop server
+    stopBridgeServers(getCache());
+  }
+
+  
+  /* same as test2Server(), but all the servers are using policy normal */
+  private int createServerRegion(VM vm, final String regionName, final boolean CCE) {
+    SerializableCallable createRegion = new SerializableCallable() {
+      public Object call() throws Exception {
+        AttributesFactory af = new AttributesFactory();
+        af.setConcurrencyChecksEnabled(CCE);
+        af.setScope(Scope.DISTRIBUTED_ACK);
+        af.setDataPolicy(DataPolicy.NORMAL);
+        createRootRegion(new AttributesFactory().create());
+        Region region = createRegion(regionName, af.create());
+
+        CacheServer server = getCache().addCacheServer();
+        int port = AvailablePortHelper.getRandomAvailableTCPPort();
+        server.setPort(port);
+        server.start();
+        return port;
+      }
+    };
+
+    return (Integer) vm.invoke(createRegion);
+  }
+  
+  public void doTest2NormalServerCCE(boolean CCE) throws CacheException, InterruptedException {
+    final String title = "doTest2NormalServerCCE="+CCE+":";
+//    disconnectAllFromDS();
+
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+    
+    final String serverHost = getServerHostName(server1.getHost());
+
+    // set notifyBySubscription=false to test local-invalidates
+    int serverPort1 = createServerRegion(server1, regionName, CCE);
+    int serverPort2 = createServerRegion(server2, regionName, CCE);
+    createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, 59000, true);
+    createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, 59000, true);
+    
+    client2.invoke(new CacheSerializableRunnable(title+"client2 add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+      }
+    });   
+
+    // test case 1: putAll and removeAll to server1
+    client1.invoke(new CacheSerializableRunnable(title+"client1 add listener and putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        doPutAll(regionName, "case1-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("case1-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+        region.put(numberOfEntries, "specialvalue");
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().setCacheWriter(new MyWriter("case1-"));
+        region.localDestroy(numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("case1-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });
+
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().setCacheWriter(new MyWriter("case1-"));
+        // normal policy will not distribute create events
+        assertEquals(0, region.size());
+      }
+    });
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 removeAll") {
+      public void run2() throws CacheException {
+        LocalRegion region = (LocalRegion)getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries+1, region.size());
+        // do removeAll with some keys not exist
+        doRemoveAll(regionName, "case1-", numberOfEntries*2);
+        assertEquals(1, region.size());
+        region.localDestroy(numberOfEntries); // do cleanup
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify removeAll Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+        MyWriter mywriter = (MyWriter)region.getAttributes().getCacheWriter();
+        getLogWriter().info("server cachewriter triggered for destroy: "+mywriter.num_destroyed);
+        assertEquals(numberOfEntries, mywriter.num_destroyed);
+      }
+    });
+
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify removeAll Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+        MyWriter mywriter = (MyWriter)region.getAttributes().getCacheWriter();
+        getLogWriter().info("server cachewriter triggered for destroy: "+mywriter.num_destroyed);
+        // beforeDestroys are only triggered at server1 since the removeAll is submitted from client1
+        assertEquals(0, mywriter.num_destroyed);
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });   
+
+    // test case 2: putAll to server1, removeAll to server2
+    client1.invoke(new CacheSerializableRunnable(title+"client1 add listener and putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        doPutAll(regionName, "case2-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("case2-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("case2-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });
+
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        // normal policy will not distribute create events
+        assertEquals(0, region.size());
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client1 removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        doRemoveAll(regionName, "case2-", numberOfEntries);
+        assertEquals(0, region.size());
+      }
+    });   
+
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify removeAll Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(100, region.size());
+      }
+    });
+
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify removeAll Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 verify removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 100);
+        doRemoveAll(regionName, "case2-", numberOfEntries); // do cleanup
+      }
+    });   
+
+    // test case 3: removeAll a list with duplicated keys 
+    client1.invoke(new CacheSerializableRunnable(title+"put 3 keys then removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.put("case3-1", "case3-1");
+        region.put("case3-2", "case3-2");
+        region.put("case3-3", "case3-3");
+        assertEquals(3, region.size());
+        ArrayList keys = new ArrayList();
+        keys.add("case3-1");
+        keys.add("case3-2");
+        keys.add("case3-3");
+        keys.add("case3-1");
+        region.removeAll(keys, "removeAllCallback");
+        assertEquals(0, region.size());
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+
+    // clean up
+    // Stop server
+    stopBridgeServers(getCache());
+  }
+  
+  public void test2NormalServerCCE() throws CacheException, InterruptedException {
+    doTest2NormalServerCCE(true);
+    disconnectAllFromDS();
+    doTest2NormalServerCCE(false);
+    disconnectAllFromDS();
+  }
+  
+  public void testPRServerRVDuplicatedKeys() throws CacheException, InterruptedException {
+    doRVDuplicatedKeys(true, 1);
+    disconnectAllFromDS();
+    doRVDuplicatedKeys(true, 0);
+    disconnectAllFromDS();
+    doRVDuplicatedKeys(false, 1);
+    disconnectAllFromDS();
+  }
+  
+  public void doRVDuplicatedKeys(final boolean isPR, final int redundantCopies) throws CacheException, InterruptedException {
+    final String title = "doRVDuplicatedKeys:";
+    //  disconnectAllFromDS();
+
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+
+    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server1.getHost());
+
+    // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
+    createBridgeServer(server1, regionName, serverPort1, isPR, redundantCopies, null);
+    createBridgeServer(server2, regionName, serverPort2, isPR, redundantCopies, null);
+    createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, 59000, false);
+    createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, 59000, false);
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+      }
+    });   
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 add listener and putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        doPutAll(regionName, "key-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });   
+
+    // test case 3: removeAll a list with duplicated keys 
+    client1.invoke(new CacheSerializableRunnable(title+"put 3 keys then removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.put("case3-1", "case3-1");
+        region.put("case3-2", "case3-2");
+        region.put("case3-3", "case3-3");
+      }
+    });   
+
+    client2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2") {
+      public void run2() throws CacheException {
+        pause(5000);
+        Region region = getRootRegion().getSubregion(regionName);
+        Region.Entry re = region.getEntry("case3-1");
+        assertNotNull(re);
+        re = region.getEntry("case3-2");
+        assertNotNull(re);
+        re = region.getEntry("case3-3");
+        assertNotNull(re);
+      }
+    });
+
+    client1.invoke(new CacheSerializableRunnable(title+"put 3 keys then removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        ArrayList keys = new ArrayList();
+        keys.add("case3-1");
+        keys.add("case3-2");
+        keys.add("case3-3");
+        keys.add("case3-1");
+        region.removeAll(keys, "removeAllCallback");
+        Region.Entry re = region.getEntry("case3-1");
+        assertNull(re);
+        re = region.getEntry("case3-2");
+        assertNull(re);
+        re = region.getEntry("case3-3");
+        assertNull(re);
+      }
+    });
+
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        Region.Entry re = region.getEntry("case3-1");
+        assertNull(re);
+        re = region.getEntry("case3-2");
+        assertNull(re);
+        re = region.getEntry("case3-3");
+        assertNull(re);
+      }
+    });
+
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        Region.Entry re = region.getEntry("case3-1");
+        assertNull(re);
+        re = region.getEntry("case3-2");
+        assertNull(re);
+        re = region.getEntry("case3-3");
+        assertNull(re);
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2") {
+      public void run2() throws CacheException {
+        pause(5000);
+        Region region = getRootRegion().getSubregion(regionName);
+        Region.Entry re = region.getEntry("case3-1");
+        assertNull(re);
+        re = region.getEntry("case3-2");
+        assertNull(re);
+        re = region.getEntry("case3-3");
+        assertNull(re);
+      }
+    });
+
+    // clean up
+    // Stop server
+    stopBridgeServers(getCache());
+  }
+  
+  public void testBug51725() throws CacheException, InterruptedException {
+    doBug51725(false);
+    disconnectAllFromDS();
+  }
+
+  public void testBug51725_singlehup() throws CacheException, InterruptedException {
+    doBug51725(true);
+    disconnectAllFromDS();
+  }
+
+  /**
+   * One failure found in bug 51725, both for putAll and removeAll:
+   * non-singlehop with redundency=1. Do some singlehop putAll to see if retry succeeded after PRE
+   * This is a singlehop putAll test.
+   */
+  public void doBug51725(boolean enableSingleHop) throws CacheException, InterruptedException {
+    final String title = "doBug51725:";
+    int client1Size;
+    int client2Size;
+    int server1Size;
+    int server2Size;
+
+//    disconnectAllFromDS();
+
+    final Host host = Host.getHost(0);
+    final VM server1 = host.getVM(0);
+    final VM server2 = host.getVM(1);
+    final VM client1 = host.getVM(2);
+    final VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+    
+    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server1.getHost());
+
+    // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
+    createBridgeServer(server1, regionName, serverPort1, true, 0, "ds1");
+    createBridgeServer(server2, regionName, serverPort2, true, 0, "ds1");
+    createClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, 59000, false, false, enableSingleHop);
+    createClient(client2, regionName, serverHost, new int[] {serverPort1}, -1, 59000, false, false, enableSingleHop);
+
+    server1.invoke(addExceptionTag1(expectedExceptions));
+    server2.invoke(addExceptionTag1(expectedExceptions));
+    client1.invoke(addExceptionTag1(expectedExceptions));
+    client2.invoke(addExceptionTag1(expectedExceptions));
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        
+        region.registerInterest("ALL_KEYS");
+        getLogWriter().info("client2 registerInterest ALL_KEYS at "+region.getFullPath());
+      }
+    });
+    
+    client1.invoke(new CacheSerializableRunnable(title+"put 3 keys then removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        // do putAll to create all buckets
+        doPutAll(regionName, "key-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+      }
+    });   
+    
+    closeCache(server2);
+    client1.invoke(new CacheSerializableRunnable(title+"putAll from client again") {
+      public void run2() throws CacheException {
+        LocalRegion region = (LocalRegion)getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+
+        try {
+          doPutAll(regionName, title, numberOfEntries);
+          fail("Expect ServerOperationException caused by PutAllParitialResultException");
+        } catch (ServerOperationException soe) {
+          assertTrue(soe.getMessage().contains(LocalizedStrings.Region_PutAll_Applied_PartialKeys_At_Server_0.toLocalizedString(region.getFullPath())));
+          assertTrue(soe.getCause() instanceof RuntimeException);
+        }
+        assertEquals(numberOfEntries*3/2, region.size());
+        VersionTag tag;
+        int cnt=0;
+        for (int i=0; i<numberOfEntries; i++) {
+          tag = region.getVersionTag(title+i);
+          if (tag != null && 1 == tag.getEntryVersion()) {
+            cnt++;
+          }
+        }
+        assertEquals(numberOfEntries/2, cnt);
+
+        try {
+          doRemoveAll(regionName, title, numberOfEntries);
+        } catch (ServerOperationException soe) {
+          assertTrue(soe.getMessage().contains(LocalizedStrings.Region_RemoveAll_Applied_PartialKeys_At_Server_0.toLocalizedString(region.getFullPath())));
+          assertTrue(soe.getCause() instanceof RuntimeException);
+        }
+        Region.Entry re;
+        // putAll only created 50 entries, removeAll removed them. So 100 entries are all NULL
+        for (int i=0; i<numberOfEntries; i++) {
+          re = region.getEntry(title+i);
+          assertNull(re);
+        }
+      }
+    });
+    
+    client2.invoke(new CacheSerializableRunnable(title+"verify entries from client2") {
+      public void run2() throws CacheException {
+        pause(5000);
+        Region region = getRootRegion().getSubregion(regionName);
+        Region.Entry re;
+        for (int i=0; i<numberOfEntries; i++) {
+          re = region.getEntry(title+i);
+          assertNull(re);
+        }
+      }
+    });   
+
+    // Stop server
+    stopBridgeServers(getCache());
+  }
+
+  /**
+   * Tests putAll to 2 PR servers.
+   */
+  public void testPRServer() throws CacheException, InterruptedException {
+    final String title = "testPRServer:";
+//    disconnectAllFromDS();
+
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+    
+    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server1.getHost());
+
+    // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
+    createBridgeServer(server1, regionName, serverPort1, true, 1, null);
+    createBridgeServer(server2, regionName, serverPort2, true, 1, null);
+    createBridgeClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, 59000, false);
+    createBridgeClient(client2, regionName, serverHost, new int[] {serverPort2}, -1, 59000, false);
+    
+    client2.invoke(new CacheSerializableRunnable(title+"client2 add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+      }
+    });   
+    
+    client1.invoke(new CacheSerializableRunnable(title+"client1 add listener and putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        doPutAll(regionName, "key-", numberOfEntries);
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().setCacheWriter(new MyWriter("key-"));
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });
+
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().setCacheWriter(new MyWriter("key-"));
+        assertEquals(numberOfEntries, region.size());
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+        }
+      }
+    });
+    client2.invoke(new CacheSerializableRunnable(title
+        + "verify client2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries);
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("key-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+      }
+    });
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        doRemoveAll(regionName, "key-", numberOfEntries);
+        assertEquals(0, region.size());
+      }
+    });
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify removeAll Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+        MyWriter mywriter = (MyWriter)region.getAttributes().getCacheWriter();
+        getLogWriter().info("server cachewriter triggered for destroy: "+mywriter.num_destroyed);
+        // beforeDestroys are only triggered at primary buckets. server1 and server2 each holds half of buckets
+        assertEquals(numberOfEntries/2, mywriter.num_destroyed);
+      }
+    });
+
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify removeAll Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+        MyWriter mywriter = (MyWriter)region.getAttributes().getCacheWriter();
+        getLogWriter().info("server cachewriter triggered for destroy: "+mywriter.num_destroyed);
+        // beforeDestroys are only triggered at primary buckets. server1 and server2 each holds half of buckets
+        assertEquals(numberOfEntries/2, mywriter.num_destroyed);
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });   
+
+    // Execute client putAll from multithread client
+    {
+    AsyncInvocation async1 = client1.invokeAsync(new CacheSerializableRunnable(
+        title + "async putAll1 from client1") {
+      public void run2() throws CacheException {
+        doPutAll(regionName, "async1key-", numberOfEntries);
+      }
+    });
+    AsyncInvocation async2 = client1.invokeAsync(new CacheSerializableRunnable(
+        title + "async putAll2 from client1") {
+      public void run2() throws CacheException {
+        doPutAll(regionName, "async2key-", numberOfEntries);
+      }
+    });
+
+    DistributedTestCase.join(async1, 30 * 1000, getLogWriter());
+    DistributedTestCase.join(async2, 30 * 1000, getLogWriter());
+    }
+    
+    client1.invoke(new CacheSerializableRunnable(title
+        + "verify client 1 for async keys") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(numberOfEntries*2, region.size());
+        long ts1 = 0, ts2 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("async1key-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+
+          obj = (TestObject)region.getEntry("async2key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts2);
+          ts2 = obj.getTS();
+        }
+      }
+    });
+
+    // verify bridge server 2 for asyn keys
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2 for async keys") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        long ts1 = 0, ts2 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("async1key-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+
+          obj = (TestObject)region.getEntry("async2key-" + i).getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts2);
+          ts2 = obj.getTS();
+        }
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify async putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries*2);
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("async1key-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("async2key-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+      }
+    });   
+
+    {
+      // Execute client removeAll from multithread client
+      AsyncInvocation async1 = client1.invokeAsync(new CacheSerializableRunnable(
+          title + "async putAll1 from client1") {
+        public void run2() throws CacheException {
+          doRemoveAll(regionName, "async1key-", numberOfEntries);
+        }
+      });
+      AsyncInvocation async2 = client1.invokeAsync(new CacheSerializableRunnable(
+          title + "async putAll2 from client1") {
+        public void run2() throws CacheException {
+          doRemoveAll(regionName, "async2key-", numberOfEntries);
+        }
+      });
+
+      DistributedTestCase.join(async1, 30 * 1000, getLogWriter());
+      DistributedTestCase.join(async2, 30 * 1000, getLogWriter());
+    }
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        doRemoveAll(regionName, "key-", numberOfEntries);
+        assertEquals(0, region.size());
+      }
+    });   
+
+    // verify bridge server 1, its data are from client
+    server1.invoke(new CacheSerializableRunnable(title
+        + "verify async removeAll Bridge Server 1") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify async removeAll Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify async removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });
+
+    // Execute p2p putAll
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 execute P2P putAll") {
+      public void run2() throws CacheException {
+        doPutAll(regionName, "p2pkey-", numberOfEntries);
+        Region region = getRootRegion().getSubregion(regionName);
+        long ts1 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("p2pkey-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+        }
+      }
+    });
+
+    // verify bridge server 2 for p2p keys
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify Bridge Server 2 for async keys") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        long ts1 = 0;
+        for (int i = 0; i < numberOfEntries; i++) {
+          TestObject obj = (TestObject)region.getEntry("p2pkey-" + i)
+              .getValue();
+          assertEquals(i, obj.getPrice());
+          assertTrue(obj.getTS() >= ts1);
+          ts1 = obj.getTS();
+        }
+      }
+    });
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify p2p putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries);
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("p2pkey-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+      }
+    });   
+    client1.invoke(new CacheSerializableRunnable(title+"client1 verify p2p putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, numberOfEntries);
+        for (int i = 0; i < numberOfEntries; i++) {
+          Region.Entry re = region.getEntry("p2pkey-" + i);
+          assertNotNull(re);
+          assertEquals(null, re.getValue());
+        }
+      }
+    });   
+
+    // Execute p2p removeAll
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 execute P2P removeAll") {
+      public void run2() throws CacheException {
+        doRemoveAll(regionName, "p2pkey-", numberOfEntries);
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+    // verify bridge server 2, because its data are from distribution
+    server2.invoke(new CacheSerializableRunnable(title
+        + "verify p2p removeAll Bridge Server 2") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        assertEquals(0, region.size());
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 verify p2p removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 verify p2p removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        checkRegionSize(region, 0);
+      }
+    });
+
+    // putAll at client2 to trigger local-invalidates at client1
+    client2.invoke(new CacheSerializableRunnable(title
+        + "execute putAll on client2 for key 0-10") {
+      public void run2() throws CacheException {
+        doPutAll(regionName, "key-", 10);
+      }
+    });
+
+    // verify client 2 for key 0-10
+    client1.invoke(new CacheSerializableRunnable(title
+        + "verify client1 for local invalidate") {
+      public void run2() throws CacheException {
+        final Region region = getRootRegion().getSubregion(regionName);
+        for (int i = 0; i < 10; i++) {
+          final int ii = i;
+          WaitCriterion ev = new WaitCriterion() {
+            public boolean done() {
+              Entry entry = region.getEntry("key-" + ii);
+              return entry != null && entry.getValue() == null;
+            }
+            public String description() {
+              return null;
+            }
+          };
+          DistributedTestCase.waitForCriterion(ev, 10 * 1000, 1000, true);
+          // local invalidate will set the value to null
+          TestObject obj = (TestObject)region.getEntry("key-" + i).getValue();
+          assertEquals(null, obj);
+        }
+      }
+    });
+
+    // Stop server
+    stopBridgeServers(getCache());
+  }
+
+  // Checks to see if a client does a destroy that throws an exception from CacheWriter beforeDestroy
+  // that the size of the region is still correct.
+  // See bug 51583.
+  public void testClientDestroyOfUncreatedEntry() throws CacheException, InterruptedException {
+    final String title = "testClientDestroyOfUncreatedEntry:";
+
+    final Host host = Host.getHost(0);
+    final VM server1 = host.getVM(0);
+    final VM client1 = host.getVM(1);
+    final String regionName = getUniqueName();
+
+    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server1.getHost());
+
+    // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
+    createBridgeServer(server1, regionName, serverPort1, false, 0, null);
+    createClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, false, true, true);
+ 
+    server1.invoke(addExceptionTag1(expectedExceptions));
+    client1.invoke(addExceptionTag1(expectedExceptions));
+
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 add cacheWriter") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        // Install cacheWriter that causes the very first destroy to fail
+        region.getAttributesMutator().setCacheWriter(new MyWriter(0));
+      }
+    });
+
+    assertEquals(0, getRegionSize(server1, regionName));
+    client1.invoke(new CacheSerializableRunnable(title+"client1 destroy") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        try {
+          region.destroy("bogusKey");
+          fail("Expect ServerOperationException caused by CacheWriterException");
+        } catch (ServerOperationException expected) {
+          assertTrue(expected.getCause() instanceof CacheWriterException);
+        }
+      }
+    });   
+    assertEquals(0, getRegionSize(server1, regionName));
+    
+    server1.invoke(removeExceptionTag1(expectedExceptions));
+    client1.invoke(removeExceptionTag1(expectedExceptions));
+
+    stopBridgeServers(getCache());
+  }
+
+  /**
+   * Tests partial key putAll and removeAll to 2 servers with local region
+   */
+  public void testPartialKeyInLocalRegion() throws CacheException, InterruptedException {
+    final String title = "testPartialKeyInLocalRegion:";
+//    disconnectAllFromDS();
+
+    final Host host = Host.getHost(0);
+    final VM server1 = host.getVM(0);
+    final VM server2 = host.getVM(1);
+    final VM client1 = host.getVM(2);
+    final VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+    
+    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server1.getHost());
+
+    // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
+    createBridgeServer(server1, regionName, serverPort1, false, 0, null);
+    createBridgeServer(server2, regionName, serverPort2, false, 0, null);
+    createClient(client1, regionName, serverHost, new int[] {serverPort1}, -1, -1, false, true, true);
+    createClient(client2, regionName, serverHost, new int[] {serverPort1}, -1, -1, false, true, true);
+    
+    server1.invoke(addExceptionTag1(expectedExceptions));
+    server2.invoke(addExceptionTag1(expectedExceptions));
+    client1.invoke(addExceptionTag1(expectedExceptions));
+    client2.invoke(addExceptionTag1(expectedExceptions));
+
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 add cacheWriter") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        // let the server to trigger exception after created 15 keys
+        region.getAttributesMutator().setCacheWriter(new MyWriter(15));
+      }
+    });
+
+    client2.invoke(new CacheSerializableRunnable(title+"client2 add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+        
+        region.registerInterest("ALL_KEYS");
+        getLogWriter().info("client1 registerInterest ALL_KEYS at "+region.getFullPath());
+      }
+    });   
+
+    client1.invoke(new CacheSerializableRunnable(title+"client1 putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+
+        // create keys
+        try {
+          doPutAll(regionName, title, numberOfEntries);
+          fail("Expect ServerOperationException caused by PutAllParitialResultException");
+        } catch (ServerOperationException soe) {
+          assertTrue(soe.getMessage().contains(LocalizedStrings.Region_PutAll_Applied_PartialKeys_At_Server_0.toLocalizedString(region.getFullPath())));
+          assertTrue(soe.getCause() instanceof RuntimeException);
+          assertTrue(soe.getCause().getMessage().contains("Triggered exception as planned, created 15 keys"));
+        }
+      }
+    });   
+
+    {
+    WaitCriterion waitForSizes = new WaitCriterion() {
+      public String description() {
+        return "waiting for conditions to be met";
+      }
+      public boolean done() {
+        int c1Size = getRegionSize(client1, regionName);
+        int c2Size = getRegionSize(client2, regionName);
+        int s1Size = getRegionSize(server1, regionName);
+        int s2Size = getRegionSize(server2, regionName);
+        getLogWriter().info("region sizes: "+c1Size+","+c2Size+","+s1Size+","+s2Size);
+        if (c1Size != 15) {
+          getLogWriter().info("waiting for client1 to get all updates");
+          return false;
+        }
+        if (c2Size != 15) {
+          getLogWriter().info("waiting for client2 to get all updates");
+          return false;
+        }
+        if (s1Size != 15) {
+          getLogWriter().info("waiting for server1 to get all updates");
+          return false;
+        }
+        if (s2Size != 15) {
+          getLogWriter().info("waiting for server2 to get all updates");
+          return false;
+        }
+        return true;
+      }
+    };
+    waitForCriterion(waitForSizes, 10000, 1000, true);
+    }
+    int server1Size = getRegionSize(server1, regionName);
+    int server2Size = getRegionSize(server1, regionName);
+
+    // reset cacheWriter's count to allow another 15 keys to be created
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 add cacheWriter") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        // let the server to trigger exception after created 15 keys
+        region.getAttributesMutator().setCacheWriter(new MyWriter(15));
+      }
+    });
+
+    // p2p putAll on DR and expect exception
+    server2.invoke(new CacheSerializableRunnable(title+"server2 add listener and putAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+
+        // create keys
+        try {
+          doPutAll(regionName, title+"again:", numberOfEntries);
+          fail("Expect original RuntimeException caused by cacheWriter");
+        } catch (RuntimeException rte) {
+          assertTrue(rte.getMessage().contains("Triggered exception as planned, created 15 keys"));
+        }
+      }
+    });   
+
+    server2Size = getRegionSize(server1, regionName);
+    assertEquals(server1Size+15, server2Size);
+    
+    {
+      WaitCriterion waitForSizes = new WaitCriterion() {
+        public String description() {
+          return "waiting for conditions to be met";
+        }
+        public boolean done() {
+          int c1Size = getRegionSize(client1, regionName);
+          int c2Size = getRegionSize(client2, regionName);
+          int s1Size = getRegionSize(server1, regionName);
+          int s2Size = getRegionSize(server2, regionName);
+          getLogWriter().info("region sizes: "+c1Size+","+c2Size+","+s1Size+","+s2Size);
+          if (c1Size != 15) { // client 1 did not register interest
+            getLogWriter().info("waiting for client1 to get all updates");
+            return false;
+          }
+          if (c2Size != 15*2) {
+            getLogWriter().info("waiting for client2 to get all updates");
+            return false;
+          }
+          if (s1Size != 15*2) {
+            getLogWriter().info("waiting for server1 to get all updates");
+            return false;
+          }
+          if (s2Size != 15*2) {
+            getLogWriter().info("waiting for server2 to get all updates");
+            return false;
+          }
+          return true;
+        }
+      };
+      waitForCriterion(waitForSizes, 10000, 1000, true);
+    }
+    
+    // now do a removeAll that is not allowed to remove everything
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 add cacheWriter") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        // server triggers exception after destroying 5 keys
+        region.getAttributesMutator().setCacheWriter(new MyWriter(5));
+      }
+    });
+    client1.invoke(new CacheSerializableRunnable(title+"client1 removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+
+        // create keys
+        try {
+          doRemoveAll(regionName, title, numberOfEntries);
+          fail("Expect ServerOperationException caused by PutAllParitialResultException");
+        } catch (ServerOperationException soe) {
+          assertTrue(soe.getMessage().contains(LocalizedStrings.Region_RemoveAll_Applied_PartialKeys_At_Server_0.toLocalizedString(region.getFullPath())));
+          assertTrue(soe.getCause() instanceof RuntimeException);
+          assertTrue(soe.getCause().getMessage().contains("Triggered exception as planned, destroyed 5 keys"));
+        }
+      }
+    });   
+    {
+      WaitCriterion waitForSizes = new WaitCriterion() {
+        public String description() {
+          return "waiting for conditions to be met";
+        }
+        public boolean done() {
+          int c1Size = getRegionSize(client1, regionName);
+          int c2Size = getRegionSize(client2, regionName);
+          int s1Size = getRegionSize(server1, regionName);
+          int s2Size = getRegionSize(server2, regionName);
+          getLogWriter().info("region sizes: "+c1Size+","+c2Size+","+s1Size+","+s2Size);
+          if (c1Size != 15-5) { // client 1 did not register interest
+            getLogWriter().info("waiting for client1 to get all destroys");
+            return false;
+          }
+          if (c2Size != (15*2)-5) {
+            getLogWriter().info("waiting for client2 to get all destroys");
+            return false;
+          }
+          if (s1Size != (15*2)-5) {
+            getLogWriter().info("waiting for server1 to get all destroys");
+            return false;
+          }
+          if (s2Size != (15*2)-5) {
+            getLogWriter().info("waiting for server2 to get all destroys");
+            return false;
+          }
+          return true;
+        }
+      };
+      waitForCriterion(waitForSizes, 10000, 1000, true);
+    }
+
+    // reset cacheWriter's count to allow another 5 keys to be destroyed
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 add cacheWriter") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        // server triggers exception after destroying 5 keys
+        region.getAttributesMutator().setCacheWriter(new MyWriter(5));
+      }
+    });
+
+    // p2p putAll on DR and expect exception
+    server2.invoke(new CacheSerializableRunnable(title+"server2 add listener and removeAll") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+
+        // create keys
+        try {
+          doRemoveAll(regionName, title+"again:", numberOfEntries);
+          fail("Expect original RuntimeException caused by cacheWriter");
+        } catch (RuntimeException rte) {
+          assertTrue(rte.getMessage().contains("Triggered exception as planned, destroyed 5 keys"));
+        }
+      }
+    });   
+    
+    {
+      WaitCriterion waitForSizes = new WaitCriterion() {
+        public String description() {
+          return "waiting for conditions to be met";
+        }
+        public boolean done() {
+          int c1Size = getRegionSize(client1, regionName);
+          int c2Size = getRegionSize(client2, regionName);
+          int s1Size = getRegionSize(server1, regionName);
+          int s2Size = getRegionSize(server2, regionName);
+          getLogWriter().info("region sizes: "+c1Size+","+c2Size+","+s1Size+","+s2Size);
+          if (c1Size != 15-5) { // client 1 did not register interest
+            getLogWriter().info("waiting for client1 to get all destroys");
+            return false;
+          }
+          if (c2Size != (15*2)-5-5) {
+            getLogWriter().info("waiting for client2 to get all destroys");
+            return false;
+          }
+          if (s1Size != (15*2)-5-5) {
+            getLogWriter().info("waiting for server1 to get all destroys");
+            return false;
+          }
+          if (s2Size != (15*2)-5-5) {
+            getLogWriter().info("waiting for server2 to get all destroys");
+            return false;
+          }
+          return true;
+        }
+      };
+      waitForCriterion(waitForSizes, 10000, 1000, true);
+    }
+    server1.invoke(removeExceptionTag1(expectedExceptions));
+    server2.invoke(removeExceptionTag1(expectedExceptions));
+    client1.invoke(removeExceptionTag1(expectedExceptions));
+    client2.invoke(removeExceptionTag1(expectedExceptions));
+
+    // Stop server
+    stopBridgeServers(getCache());
+  }
+
+  /**
+   * Tests partial key putAll to 2 PR servers, because putting data at server
+   * side is different between PR and LR. PR does it in postPutAll.
+   * It's not running in singleHop putAll
+   */
+  public void testPartialKeyInPR() throws CacheException, InterruptedException {
+    final String title = "testPartialKeyInPR:";
+//    disconnectAllFromDS();
+
+    final Host host = Host.getHost(0);
+    VM server1 = host.getVM(0);
+    final VM server2 = host.getVM(1);
+    VM client1 = host.getVM(2);
+    VM client2 = host.getVM(3);
+    final String regionName = getUniqueName();
+    
+    final int serverPort1 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final int serverPort2 = AvailablePort.getRandomAvailablePort(AvailablePort.SOCKET);
+    final String serverHost = getServerHostName(server1.getHost());
+
+    // set <true, false> means <PR=true, notifyBySubscription=false> to test local-invalidates
+    createBridgeServer(server1, regionName, serverPort1, true, 0, "ds1");
+    createBridgeServer(server2, regionName, serverPort2, true, 0, "ds1");
+    createClient(client1, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1, -1, false, false, true);
+    createClient(client2, regionName, serverHost, new int[] {serverPort1, serverPort2}, -1, -1, false, false, true);
+
+    server1.invoke(addExceptionTag1(expectedExceptions));
+    server2.invoke(addExceptionTag1(expectedExceptions));
+    client1.invoke(addExceptionTag1(expectedExceptions));
+    client2.invoke(addExceptionTag1(expectedExceptions));
+
+    server1.invoke(new CacheSerializableRunnable(title
+        + "server1 add slow listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(true));
+      }
+    });
+
+    final SharedCounter sc_server2 = new SharedCounter("server2");
+    
+    server2.invoke(new CacheSerializableRunnable(title
+        + "server2 add slow listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(server2, true, sc_server2, 10));
+      }
+    });
+    
+    client2.invoke(new CacheSerializableRunnable(title+"client2 add listener") {
+      public void run2() throws CacheException {
+        Region region = getRootRegion().getSubregion(regionName);
+        region.getAttributesMutator().addCacheListener(new MyListener(false));
+       

<TRUNCATED>


Mime
View raw message