geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [16/50] [abbrv] incubator-geode git commit: GEODE-599: fix clear with concurrent writes
Date Thu, 18 Aug 2016 21:18:16 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
index 14a2d2f..cce0b73 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/RegionMap.java
@@ -27,6 +27,7 @@ import com.gemstone.gemfire.cache.EntryNotFoundException;
 import com.gemstone.gemfire.cache.Operation;
 import com.gemstone.gemfire.cache.TimeoutException;
 import com.gemstone.gemfire.cache.TransactionId;
+import com.gemstone.gemfire.internal.cache.AbstractRegionMap.ARMLockTestHook;
 import com.gemstone.gemfire.internal.cache.lru.LRUMapCallbacks;
 import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID;
 import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
@@ -391,4 +392,7 @@ public interface RegionMap extends LRUMapCallbacks {
   public void decTxRefCount(RegionEntry e);
 
   public void close();
+  
+  public ARMLockTestHook getARMLockTestHook();
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
index 8866689..da35c86 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/StateFlushOperation.java
@@ -140,6 +140,11 @@ public class StateFlushOperation  {
       dm.putOutgoing(gr);
       processors.add(processor);
     }
+
+    if(r.getRegionMap().getARMLockTestHook()!=null) {
+      r.getRegionMap().getARMLockTestHook().beforeStateFlushWait();
+    }
+
     for (ReplyProcessor21 processor: processors) {
       try {
         processor.waitForReplies();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java
index 2f02422..c543303 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/versions/RegionVersionVector.java
@@ -403,7 +403,6 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
implements
       this.versionLock.readLock().lock();
     }
   }
-
   
   /** release the lock preventing concurrent clear() from happening */
   public void releaseCacheModificationLock(LocalRegion owner) {
@@ -411,6 +410,16 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
implements
       this.versionLock.readLock().unlock();
     }
   }
+  
+  /** obtain a lock to prevent concurrent clear() from happening */
+  public void lockForCacheModification() {
+    this.versionLock.readLock().lock();
+  }
+
+  /** release the lock preventing concurrent clear() from happening */
+  public void releaseCacheModificationLock() {
+    this.versionLock.readLock().unlock();
+  }
     
   private void syncLocalVersion() {
     long v = localVersion.get();
@@ -1461,6 +1470,7 @@ public abstract class RegionVersionVector<T extends VersionSource<?>>
implements
   public Version[] getSerializationVersions(){
     return null;
   }
+  
 //  /**
 //   * This class will wrap DM member IDs to provide integers that can be stored
 //   * on disk and be timed out in the vector.

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ARMLockTestHookAdapter.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ARMLockTestHookAdapter.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ARMLockTestHookAdapter.java
new file mode 100644
index 0000000..89bb4ee
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ARMLockTestHookAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import java.io.Serializable;
+import java.util.concurrent.CountDownLatch;
+
+import com.gemstone.gemfire.cache.CacheEvent;
+import com.gemstone.gemfire.test.dunit.VM;
+
+public class ARMLockTestHookAdapter implements AbstractRegionMap.ARMLockTestHook, Serializable
{
+
+  public void beforeBulkLock(LocalRegion region) {};
+  public void afterBulkLock(LocalRegion region) {};
+  public void beforeBulkRelease(LocalRegion region) {};
+  public void afterBulkRelease(LocalRegion region) {};
+
+  public void beforeLock(LocalRegion region, CacheEvent event) {};
+  public void afterLock(LocalRegion region, CacheEvent event) {};
+  public void beforeRelease(LocalRegion region, CacheEvent event) {};
+  public void afterRelease(LocalRegion region, CacheEvent event) {};
+
+  public void beforeStateFlushWait() {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/e08c1f54/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClearRvvLockingDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClearRvvLockingDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClearRvvLockingDUnitTest.java
new file mode 100644
index 0000000..d97fddf
--- /dev/null
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/ClearRvvLockingDUnitTest.java
@@ -0,0 +1,667 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/*
+ * ClearRvvLockingDUnitTest.java
+ *
+ * Created on September 6, 2005, 2:57 PM
+ */
+package com.gemstone.gemfire.internal.cache;
+
+import static com.gemstone.gemfire.test.dunit.Assert.fail;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.logging.log4j.Logger;
+import org.assertj.core.api.JUnitSoftAssertions;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.CacheEvent;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.Region;
+import com.gemstone.gemfire.cache.RegionFactory;
+import com.gemstone.gemfire.cache.RegionShortcut;
+import com.gemstone.gemfire.cache.Scope;
+import com.gemstone.gemfire.distributed.DistributedMember;
+import com.gemstone.gemfire.distributed.DistributedSystem;
+import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember;
+import com.gemstone.gemfire.internal.logging.LogService;
+import com.gemstone.gemfire.test.dunit.Host;
+import com.gemstone.gemfire.test.dunit.SerializableCallable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnable;
+import com.gemstone.gemfire.test.dunit.SerializableRunnableIF;
+import com.gemstone.gemfire.test.dunit.VM;
+import com.gemstone.gemfire.test.dunit.cache.internal.JUnit4CacheTestCase;
+import com.gemstone.gemfire.test.junit.categories.DistributedTest;
+
+/**
+ * Test class to verify proper RVV locking interaction between
+ * entry operations such as PUT/REMOVE and the CLEAR region operation
+ * 
+ * GEODE-599: After an operation completed, it would unlock the RVV.
+ * This was occurring before the operation was distributed to other members
+ * which created a window in which another operation could be performed
+ * prior to that operation being distributed.
+ * 
+ * The fix for GEODE-599 was to not release the lock until after
+ * distributing the operation to the other members.
+ *  
+ */
+
+@SuppressWarnings("serial")
+@Category(DistributedTest.class)
+public class ClearRvvLockingDUnitTest extends JUnit4CacheTestCase {
+
+  @Rule
+  public transient JUnitSoftAssertions softly = new JUnitSoftAssertions();
+  /*
+   * The tests perform a single operation and a concurrent clear.
+   * 
+   * opsVM determines where the single operation will be performed, null will perform the
op on the test VM (vm0)
+   * clearVM determines where the clear operation will be performed, null will perform the
clear on the test VM (vm0)
+   * 
+   * Specifying NULL/NULL for opsVM and clearVM has the effect of performing both in the
same thread
+   * whereas specifying vm0/vm0 for example will run them both on the same vm, but different
threads.
+   * NULL/NULL is not tested here since the same thread performing a clear prior to returning
from a put
+   * is not possible using the public API.
+   * 
+   * Each test is performed twice once with operation and clear on the same vm, once on different
vms.
+   * 
+   */
+  VM vm0, vm1, opsVM, clearVM;
+  
+  static Cache cache;
+  static LocalRegion region;
+  DistributedMember vm0ID, vm1ID;
+  
+  static AbstractRegionMap.ARMLockTestHook theHook;
+
+  static final String THE_KEY = "theKey";
+  static final String THE_VALUE = "theValue";
+
+  private static final Logger logger = LogService.getLogger();
+
+  //test methods
+
+  @Test
+  public void testPutOperationSameVM() {
+    try {
+      setupMembers();
+      setOpAndClearVM(vm0, vm0);   // first arg is where to perform operation, second arg
where to perform clear
+      opsVM.invoke(() -> setBasicHook(opsVM));
+      runConsistencyTest(vm0, performPutOperation);
+      checkForConsistencyErrors();
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testPutOperationDifferentVM() {
+    try {
+      setupMembers();
+      setOpAndClearVM(vm0, vm1);  // first arg is where to perform operation, second arg
where to perform clear   
+      opsVM.invoke(() -> setBasicHook(clearVM));
+      runConsistencyTest(vm0, performPutOperation);
+      checkForConsistencyErrors();
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testPutOperationNoAck() {
+    try {
+      setupNoAckMembers();
+      setOpAndClearVM(vm0, vm0);
+      vm0.invoke(() -> setLocalNoAckHook(vm1));
+      vm1.invoke(() -> setRemoteNoAckHook(vm0));
+      vm0.invoke(() -> primeStep1(1));
+      vm1.invoke(() -> primeStep2(1));
+      runConsistencyTest(vm0, performNoAckPutOperation);
+      checkForConsistencyErrors();
+    } finally {
+      vm0.invoke(() -> resetHook());
+      vm1.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testRemoveOperationSameVM() {
+    try {
+      setupMembers();    
+      setOpAndClearVM(vm0, vm0);
+      opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
+      runConsistencyTest(vm0, performRemoveOperation);
+      checkForConsistencyErrors();
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testRemoveOperationDifferentVM() {
+    try {
+      setupMembers();    
+      setOpAndClearVM(vm0, vm1);
+      opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
+      runConsistencyTest(vm0, performRemoveOperation);
+      checkForConsistencyErrors();
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testInvalidateOperationSameVM() {
+    try {
+      setupMembers();    
+      setOpAndClearVM(vm0, vm0);
+      opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
+      runConsistencyTest(vm0, performInvalidateOperation);
+      checkForConsistencyErrors();   
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testInvalidateOperationDifferentVM() {
+    try {
+      setupMembers();    
+      setOpAndClearVM(vm0, vm1);
+      opsVM.invoke(() -> setRemoveAndInvalidateHook(clearVM));
+      runConsistencyTest(vm0, performInvalidateOperation);
+      checkForConsistencyErrors();   
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testPutAllOperationSameVM() {
+    try {
+      setupMembers();    
+      setOpAndClearVM(vm0, vm0);
+      opsVM.invoke(() -> setBulkHook(vm0));
+      runConsistencyTest(vm0, performPutAllOperation);
+      checkForConsistencyErrors();   
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testPutAllOperationDifferentVM() {
+    try {
+      setupMembers();    
+      setOpAndClearVM(vm0, vm1);
+      opsVM.invoke(() -> setBulkHook(vm0));
+      runConsistencyTest(vm0, performPutAllOperation);
+      checkForConsistencyErrors();   
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testRemoveAllOperationSameVM() {
+    try {
+      setupMembers();    
+      setOpAndClearVM(vm0, vm0);
+      opsVM.invoke(() -> setBulkHook(vm0));
+      runConsistencyTest(vm0, performRemoveAllOperation);
+      checkForConsistencyErrors();   
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+  @Test
+  public void testRemoveAllOperationDifferentVM() {
+    try {
+      setupMembers();    
+      setOpAndClearVM(vm0, vm1);
+      opsVM.invoke(() -> setBulkHook(vm0));
+      runConsistencyTest(vm0, performRemoveAllOperation);
+      checkForConsistencyErrors();   
+    } finally {
+      opsVM.invoke(() -> resetHook());
+    }
+  }
+
+
+  private void invokePut(VM whichVM) {
+    if(whichVM==null) {
+      doPut();
+    } else {
+      whichVM.invoke(() -> doPut());
+    }
+  }
+  
+  private void invokeRemove(VM whichVM) {
+    if(whichVM==null) {
+      doRemove();
+    } else {
+      whichVM.invoke(() -> doRemove());
+    }
+  }
+
+  private void invokeInvalidate(VM whichVM) {
+    if(whichVM==null) {
+      doInvalidate();
+    } else {
+      whichVM.invoke(() -> doInvalidate());
+    }
+  }
+  
+  private void invokePutAll(VM whichVM) {
+    if(whichVM==null) {
+      doPutAll();
+    } else {
+      whichVM.invoke(() -> doPutAll());
+    }
+  }
+  
+  private void invokeRemoveAll(VM whichVM) {
+    if(whichVM==null) {
+      doRemoveAll();
+    } else {
+      whichVM.invoke(() -> doRemoveAll());
+    }
+  }
+
+  private static void invokeClear(VM whichVM) {
+    if(whichVM==null) {
+      doClear();
+    } else {
+      whichVM.invoke(() -> doClear());
+    }
+  }
+
+  // remote test methods
+  
+  private static boolean doesRegionEntryExist(String key) {
+    return region.getRegionEntry(key)!=null;
+  }
+
+  private static void doPut() {
+    region.put(THE_KEY, THE_VALUE);
+  }
+
+  private static void doRemove() {
+    region.remove(THE_KEY);
+  }
+
+  private static void doInvalidate() {
+    region.invalidate(THE_KEY);
+  }
+
+  private static void doPutAll() {
+    Map<Object, Object> map = generateKeyValues();
+    region.putAll(map, "putAllCallback");
+  }
+
+  private static void doRemoveAll() {
+    Map<Object, Object> map = generateKeyValues();
+    region.removeAll(map.keySet(), "removeAllCallback");
+  }
+  
+  private static void doClear() {
+    region.clear();
+  }
+
+  private static void primeStep1(int cnt) {
+    primeStep1Latch(cnt);
+  }
+  
+  private static void primeStep2(int cnt) {
+    primeStep2Latch(cnt);
+  }
+
+  private static void releaseStep1() {
+    decrementStep1Latch();
+  }
+
+  SerializableRunnable performPutOperation = new SerializableRunnable("perform PUT") {
+    @Override
+    public void run() {
+      try {
+        invokePut(opsVM);
+      } catch (Exception e) {
+        fail("while performing PUT", e);
+      }
+    }
+  };
+
+  SerializableRunnable performNoAckPutOperation = new SerializableRunnable("perform NoAckPUT")
{
+    @Override
+    public void run() throws InterruptedException {
+      Runnable putThread1 = new Runnable() {
+        public void run() {
+          DistributedSystem.setThreadsSocketPolicy(false);
+          doPut();
+          DistributedSystem.releaseThreadsSockets();
+        }
+      };
+      
+      Runnable putThread2 = new Runnable() {
+        public void run() {
+          DistributedSystem.setThreadsSocketPolicy(false);
+          awaitStep1Latch();        
+          doClear();
+          DistributedSystem.releaseThreadsSockets();
+        }
+      };
+      
+      Thread t1 = new Thread(putThread1);
+      Thread t2 = new Thread(putThread2);
+      t2.start();
+      t1.start();
+      t1.join();
+      t2.join();
+    }
+  };
+
+  SerializableRunnable performRemoveOperation = new SerializableRunnable("perform REMOVE")
{
+    @Override
+    public void run() {
+      try {
+        invokePut(opsVM);
+        invokeRemove(opsVM);
+      } catch (Exception e) {
+        fail("while performing REMOVE", e);
+      }
+    }
+  };
+
+  SerializableRunnable performInvalidateOperation = new SerializableRunnable("perform INVALIDATE")
{
+    @Override
+    public void run() {
+      try {
+        invokePut(opsVM);
+        invokeInvalidate(opsVM);
+      } catch (Exception e) {
+        fail("while performing INVALIDATE", e);
+      }
+    }
+  };
+
+  SerializableRunnable performPutAllOperation = new SerializableRunnable("perform PUTALL")
{
+    @Override
+    public void run() {
+      try {
+        invokePutAll(opsVM);
+      } catch (Exception e) {
+        fail("while performing PUTALL", e);
+      }
+    }
+  };
+
+  SerializableRunnable performRemoveAllOperation = new SerializableRunnable("perform REMOVEALL")
{
+    @Override
+    public void run() {
+      try {
+        invokePutAll(opsVM);
+        invokeRemoveAll(opsVM);
+      } catch (Exception e) {
+        fail("while performing REMOVEALL", e);
+      }
+    }
+  };
+  
+  // helper methods
+
+  private void setOpAndClearVM(VM opsTarget, VM clearTarget) {
+    opsVM = opsTarget;
+    clearVM = clearTarget;
+  }
+  
+  private void setupMembers() {    
+    Host host = Host.getHost(0);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
+    vm0ID = createCache(vm0);
+    vm1ID = createCache(vm1);
+    String testName =  getName();    
+    vm0.invoke(() -> createRegion(testName));
+    vm1.invoke(() -> createRegion(testName));
+  }
+
+  private void setupNoAckMembers() {    
+    Host host = Host.getHost(0);
+    vm0 = host.getVM(0);
+    vm1 = host.getVM(1);
+    vm0ID = createNoConserveSocketsCache(vm0);
+    vm1ID = createNoConserveSocketsCache(vm1);
+    String testName =  getName();    
+    vm0.invoke(() -> createNOACKRegion(testName));
+    vm1.invoke(() -> createNOACKRegion(testName));
+  }
+
+  private void runConsistencyTest(VM vm, SerializableRunnableIF theTest) {
+    vm.invoke(theTest);
+  }
+
+  private void checkForConsistencyErrors() {
+    Map<Object, Object> r0Contents = (Map<Object, Object>)vm0.invoke(() ->
getRegionContents());
+    Map<Object, Object> r1Contents = (Map<Object, Object>)vm1.invoke(() ->
getRegionContents());
+
+    String key = THE_KEY;
+    softly.assertThat(r1Contents.get(key)).as("region contents are not consistent for key
%s", key).isEqualTo(r0Contents.get(key));
+    softly.assertThat(checkRegionEntry(vm1, key)).as("region entries are not consistent for
key %s", key).isEqualTo(checkRegionEntry(vm0, key));
+
+    for (int subi=1; subi<3; subi++) {
+      String subkey = key + "-" + subi;
+      if (r0Contents.containsKey(subkey)) {
+        softly.assertThat(r1Contents.get(subkey)).as("region contents are not consistent
for key %s", subkey).isEqualTo(r0Contents.get(subkey));
+      } else {
+        softly.assertThat(r1Contents).as("expected containsKey for %s to return false", subkey).doesNotContainKey(subkey);
+      }
+    }
+  }
+
+  public void resetHook() {
+    ((AbstractRegionMap) region.entries).setARMLockTestHook(null);
+  }
+
+  public void setBasicHook(VM whichVM) {
+    theOtherVM = whichVM;
+    theHook = new ArmBasicClearHook();
+    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
+  }
+
+  public void setRemoveAndInvalidateHook(VM whichVM) {
+    theOtherVM = whichVM;
+    theHook = new ArmRemoveAndInvalidateClearHook();
+    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
+  }
+
+  public void setRemoteNoAckHook(VM whichVM) {
+    theOtherVM = whichVM;
+    theHook = new ArmNoAckRemoteHook();
+    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
+  }
+  
+  public void setLocalNoAckHook(VM whichVM) {
+    theOtherVM = whichVM;
+    theHook = new ArmNoAckLocalHook();
+    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
+  }
+  
+  public void setBulkHook(VM whichVM) {
+    theOtherVM = whichVM;
+    theHook = new ArmBulkClearHook();
+    ((AbstractRegionMap) region.entries).setARMLockTestHook(theHook);
+  }
+  
+  private InternalDistributedMember createCache(VM vm) {
+    return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>()
{
+      public Object call() {
+        cache = getCache(new CacheFactory().set("conserve-sockets", "true"));
+        return getSystem().getDistributedMember();
+      }
+    });
+  }
+
+  private InternalDistributedMember createNoConserveSocketsCache(VM vm) {
+    return (InternalDistributedMember) vm.invoke(new SerializableCallable<Object>()
{
+      public Object call() {
+        cache = getCache(new CacheFactory().set("conserve-sockets", "false"));
+        return getSystem().getDistributedMember();
+      }
+    });
+  }
+
+  private static void createRegion(String rgnName) {
+    RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+    rf.setConcurrencyChecksEnabled(true);
+    rf.setScope(Scope.DISTRIBUTED_ACK);
+    region = (LocalRegion)rf.create(rgnName);
+  }
+
+  private static void createNOACKRegion(String rgnName) {
+    RegionFactory<Object, Object> rf = cache.createRegionFactory(RegionShortcut.REPLICATE);
+    rf.setConcurrencyChecksEnabled(true);
+    rf.setScope(Scope.DISTRIBUTED_NO_ACK);
+    region = (LocalRegion)rf.create(rgnName);
+  }
+
+  private static Map<Object, Object> generateKeyValues() {
+    String key = THE_KEY;
+    String value = THE_VALUE;
+    Map<Object, Object> map = new HashMap<>();
+    map.put(key, value);
+    map.put(key+"-1", value+"-1");
+    map.put(key+"-2", value+"-2");
+    return map;
+  }
+
+  @SuppressWarnings("rawtypes")
+  private static Map<Object, Object> getRegionContents() {
+    Map<Object, Object> result = new HashMap<>();
+    for (Iterator i=region.entrySet().iterator(); i.hasNext(); ) {
+      Region.Entry e = (Region.Entry)i.next();
+      result.put(e.getKey(), e.getValue());
+    }
+    return result;
+  }
+
+  private boolean checkRegionEntry(VM vm, String key) {
+    boolean target = vm.invoke(() -> doesRegionEntryExist(key)); 
+    return target;
+  }
+  
+  static VM theOtherVM;
+  transient static CountDownLatch step1Latch, step2Latch;
+
+  public static void primeStep1Latch(int waitCount) {
+    step1Latch = new CountDownLatch(waitCount);
+  }
+  
+  public static void awaitStep1Latch() {
+    try {
+      step1Latch.await();
+    } catch (InterruptedException e) {}
+  }
+  
+  public static void decrementStep1Latch() {
+    step1Latch.countDown();
+  }
+
+  public static void decrementRemoteStep1Latch() {
+    theOtherVM.invoke(() -> decrementStep1Latch());
+  }
+  
+  public static void primeStep2Latch(int waitCount) {
+    step2Latch = new CountDownLatch(waitCount);
+  }
+  
+  public static void awaitStep2Latch() {
+    try {
+      step2Latch.await();
+    } catch (InterruptedException e) {}
+  }
+  
+  public static void decrementStep2Latch() {
+    step2Latch.countDown();
+  }
+
+  public static void decrementRemoteStep2Latch() {
+    theOtherVM.invoke(() -> decrementStep2Latch());
+  }
+  /*
+   * Test callback class used to hook the rvv locking mechanism with basic operations.
+   */
+  public static class ArmBasicClearHook extends ARMLockTestHookAdapter {    
+    @Override
+    public void afterRelease(LocalRegion owner, CacheEvent event) {      
+      if((event.getOperation().isCreate()) && owner.getName().startsWith("test"))
{
+        invokeClear(theOtherVM);
+      }
+    }
+  }
+  
+  /*
+   * Test callback class used to hook the rvv locking mechanism with basic operations.
+   */
+  public static class ArmRemoveAndInvalidateClearHook extends ARMLockTestHookAdapter {  
 
+    
+    @Override
+    public void afterRelease(LocalRegion owner, CacheEvent event) {      
+      if((event.getOperation().isDestroy() || 
+          event.getOperation().isInvalidate()) &&
+          owner.getName().startsWith("test")) {
+        invokeClear(theOtherVM);
+      }
+    }
+  }
+
+  /*
+   * Test callback class used to hook the rvv locking mechanism for NOACK testing.
+   */
+  public static class ArmNoAckRemoteHook extends ARMLockTestHookAdapter {
+    @Override
+    public void beforeLock(LocalRegion owner, CacheEvent event) {
+      if(event.isOriginRemote() && event.getOperation().isCreate() && owner.getName().startsWith("test"))
{
+        theOtherVM.invoke(() -> releaseStep1());   // start clear
+        awaitStep2Latch();  // wait for clear to complete
+      }
+    }
+  }
+  
+  public static class ArmNoAckLocalHook extends ARMLockTestHookAdapter {
+    @Override
+    public void beforeStateFlushWait() {
+      decrementRemoteStep2Latch();
+    }
+  }
+  
+  /*
+   * Test callback class used to hook the rvv locking mechanism with bulk operations.
+   */
+  public static class ArmBulkClearHook extends ARMLockTestHookAdapter {    
+    @Override
+    public void afterBulkRelease(LocalRegion region) {
+      invokeClear(theOtherVM);
+    }
+  }
+}


Mime
View raw message