geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinmeil...@apache.org
Subject [4/7] incubator-geode git commit: GEODE-1420: fix intermittent TombstoneService failures
Date Thu, 07 Jul 2016 18:13:25 GMT
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
index a8a512e..9ad8e0e 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/MultiVMRegionTestCase.java
@@ -8542,30 +8542,39 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase
{
     vm3.invoke(() -> this.assertNoClearTimeouts());
   }
   
+  private void checkCCRegionTombstoneCount(String msg, int expected) {
+    int actual = CCRegion.getTombstoneCount();
+    if (expected != actual) {
+      assertEquals(msg + " region tombstone count was " + actual + " expected=" + expected
+ " TombstoneService=" + CCRegion.getCache().getTombstoneService(),
+          expected, actual);
+    }
+  }
   public void versionTestTombstones() {
     disconnectAllFromDS();
     Host host = Host.getHost(0);
     VM vm0 = host.getVM(0);
     VM vm1 = host.getVM(1);
-    final int numEntries = 1000;
+    final int numEntries = 100;
     
     // create replicated regions in VM 0 and 1, then perform concurrent ops
     // on the same key while creating the region in VM2.  Afterward make
     // sure that all three regions are consistent
-    final long oldServerTimeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
-    final long oldClientTimeout = TombstoneService.CLIENT_TOMBSTONE_TIMEOUT;
-    final long oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+    final long oldServerTimeout = TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT;
+    final long oldClientTimeout = TombstoneService.NON_REPLICATE_TOMBSTONE_TIMEOUT;
+    final int oldExpiredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
     final boolean oldIdleExpiration = TombstoneService.IDLE_EXPIRATION;
     final double oldLimit = TombstoneService.GC_MEMORY_THRESHOLD;
+    final long oldMaxSleepTime = TombstoneService.MAX_SLEEP_TIME;
     try {
       SerializableRunnable setTimeout = new SerializableRunnable() {
         @Override
         public void run() {
-          TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = 20000;
-          TombstoneService.CLIENT_TOMBSTONE_TIMEOUT = 19000;
-          TombstoneService.EXPIRED_TOMBSTONE_LIMIT = 1000;
+          TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = 1000;
+          TombstoneService.NON_REPLICATE_TOMBSTONE_TIMEOUT = 900;
+          TombstoneService.EXPIRED_TOMBSTONE_LIMIT = numEntries;
           TombstoneService.IDLE_EXPIRATION = true;
           TombstoneService.GC_MEMORY_THRESHOLD = 0;  // turn this off so heap profile won't
cause test to fail
+          TombstoneService.MAX_SLEEP_TIME = 500;
         }
       };
       vm0.invoke(setTimeout);
@@ -8601,8 +8610,7 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
               assertTrue("entry should not exist", !CCRegion.containsKey("cckey"+i));
               assertTrue("entry should not contain a value", !CCRegion.containsValueForKey("cckey"+i));
             }
-            long count = CCRegion.getTombstoneCount();
-            assertEquals("expected "+numEntries+" tombstones", numEntries, count);
+            checkCCRegionTombstoneCount("after destroys in this vm ", numEntries);
             assertTrue("region should not contain a tombstone", !CCRegion.containsValue(Token.TOMBSTONE));
             if (CCRegion.getScope().isDistributedNoAck()) {
               sendSerialMessageToAll(); // flush the ops
@@ -8616,25 +8624,20 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase
{
       vm1.invoke(new SerializableRunnable("check tombstone count(2)") {
         @Override
         public void run() {
-          final long count = CCRegion.getTombstoneCount();
-          assertEquals("expected "+numEntries+" tombstones", numEntries, count);
-          // ensure that some GC is performed - due to timing it may not
-          // be the whole batch, but some amount should be done
+          checkCCRegionTombstoneCount("after destroys in other vm ", numEntries);
           WaitCriterion waitForExpiration = new WaitCriterion() {
             @Override
             public boolean done() {
-              // TODO: in GEODE-561 this was changed to no longer wait for it
-              // to go to zero. But I think it should.
-              return CCRegion.getTombstoneCount() < numEntries;
+              return CCRegion.getTombstoneCount() == 0;
             }
             @Override
             public String description() {
-              return "Waiting for some tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
-                + " tombstones left out of " + count + " initial tombstones";
+              return "Waiting for all tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
+              + " tombstones left out of " + numEntries + " initial tombstones. " + CCRegion.getCache().getTombstoneService();
             }
           };
           try {
-            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT+10000,
1000, true);
+            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT+(TombstoneService.MAX_SLEEP_TIME*9),
100, true);
           } catch (AssertionError e) {
             CCRegion.dumpBackingMap();
             com.gemstone.gemfire.test.dunit.LogWriterUtils.getLogWriter().info("tombstone
service state: " + CCRegion.getCache().getTombstoneService());
@@ -8643,16 +8646,10 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase
{
         }
       });
 
-      // Now check to see if tombstones are resurrected by a put/create.
-      // The entries should be created okay and the callback should be afterCreate.
-      // The tombstone count won't go down until the entries are swept, but then
-      // the count should fall to zero.
-
       vm0.invoke(new SerializableRunnable("create/destroy entries and check tombstone count")
{
         @Override
         public void run() {
-          long count = CCRegion.getTombstoneCount();
-          final long origCount = count;
+          final int origCount = CCRegion.getTombstoneCount();
           try {
             WaitCriterion waitForExpiration = new WaitCriterion() {
               @Override
@@ -8662,18 +8659,17 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase
{
               @Override
               public String description() {
                 return "Waiting for all tombstones to expire.  There are now " + CCRegion.getTombstoneCount()
-                  + " tombstones left out of " + origCount + " initial tombstones";
+                  + " tombstones left out of " + origCount + " initial tombstones. " + CCRegion.getCache().getTombstoneService();
               }
             };
-            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT+10000,
1000, true);
+            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT+(TombstoneService.MAX_SLEEP_TIME*9),
100, true);
             logger.debug("creating tombstones.  current count={}", CCRegion.getTombstoneCount());
             for (int i=0; i<numEntries; i++) {
               CCRegion.create("cckey" + i, i);
               CCRegion.destroy("cckey" + i);
             }
             logger.debug("done creating tombstones.  current count={}", CCRegion.getTombstoneCount());
-            count = CCRegion.getTombstoneCount();
-            assertEquals("expected "+numEntries+" tombstones", numEntries, count);
+            checkCCRegionTombstoneCount("after create+destroy in this vm ", numEntries);
             assertEquals(0, CCRegion.size());
             afterCreates = 0;
             AttributesMutator m = CCRegion.getAttributesMutator();
@@ -8699,8 +8695,7 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase {
       vm1.invoke(new SerializableRunnable("check tombstone count and install listener") {
         @Override
         public void run() {
-          long count = CCRegion.getTombstoneCount();
-          assertEquals("expected ten tombstones", numEntries, count);
+          checkCCRegionTombstoneCount("after create+destroy in other vm ", numEntries);
           afterCreates = 0;
           AttributesMutator m = CCRegion.getAttributesMutator();
           m.addCacheListener(new CacheListenerAdapter() {
@@ -8711,6 +8706,11 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase
{
           });
         }});
 
+      // Now check to see if tombstones are resurrected by a create.
+      // The entries should be created okay and the callback should be afterCreate.
+      // The tombstone count won't go down until the entries are swept, but then
+      // the count should fall to zero.
+
       vm0.invoke(new SerializableRunnable("create entries and check afterCreate and tombstone
count") {
         @Override
         public void run() {
@@ -8718,13 +8718,24 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase
{
             for (int i=0; i<numEntries; i++) {
               CCRegion.create("cckey" + i, i);
             }
-            long count = CCRegion.getTombstoneCount();
-            assertEquals("expected zero tombstones", 0, count);
+            checkCCRegionTombstoneCount("after create in this vm", 0);
             assertEquals("expected "+numEntries+" afterCreates", numEntries, afterCreates);
             assertEquals(numEntries, CCRegion.size());
             if (CCRegion.getScope().isDistributedNoAck()) {
               sendSerialMessageToAll(); // flush the ops
             }
+            WaitCriterion waitForExpiration = new WaitCriterion() {
+              @Override
+              public boolean done() {
+                return CCRegion.getCache().getTombstoneService().getScheduledTombstoneCount()
==  0;
+              }
+              @Override
+              public String description() {
+                return "Waiting for all scheduled tombstones to be removed.  There are now
" + CCRegion.getCache().getTombstoneService().getScheduledTombstoneCount()
+                  + " tombstones left out of " + numEntries + " initial tombstones. " + CCRegion.getCache().getTombstoneService();
+              }
+            };
+            Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT*5,
100, true);
           } catch (CacheException e) {
             fail("while performing create operations", e);
           }
@@ -8734,30 +8745,34 @@ public abstract class MultiVMRegionTestCase extends RegionTestCase
{
       vm1.invoke(new SerializableRunnable("check afterCreate and tombstone count") {
         @Override
         public void run() {
-          long count = CCRegion.getTombstoneCount();
-          assertEquals("expected zero tombstones", 0, count);
+          checkCCRegionTombstoneCount("after create in other vm", 0);
           assertEquals("expected "+numEntries+" afterCreates", numEntries, afterCreates);
           assertEquals(numEntries, CCRegion.size());
+          WaitCriterion waitForExpiration = new WaitCriterion() {
+            @Override
+            public boolean done() {
+              return CCRegion.getCache().getTombstoneService().getScheduledTombstoneCount()
==  0;
+            }
+            @Override
+            public String description() {
+              return "Waiting for all scheduled tombstones to be removed.  There are now
" + CCRegion.getCache().getTombstoneService().getScheduledTombstoneCount()
+                + " tombstones left out of " + numEntries + " initial tombstones. " + CCRegion.getCache().getTombstoneService();
+            }
+          };
+          Wait.waitForCriterion(waitForExpiration, TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT*5,
100, true);
         }
       });
 
-      vm0.invoke(new SerializableRunnable("check region size and tombstone count") {
-        @Override
-        public void run() {
-          long count = CCRegion.getTombstoneCount();
-          assertEquals("expected all tombstones to be expired", 0, count);
-          assertEquals(numEntries, CCRegion.size());
-        }
-      });
     } finally {
       SerializableRunnable resetTimeout = new SerializableRunnable() {
         @Override
         public void run() {
-          TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = oldServerTimeout;
-          TombstoneService.CLIENT_TOMBSTONE_TIMEOUT = oldClientTimeout;
+          TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = oldServerTimeout;
+          TombstoneService.NON_REPLICATE_TOMBSTONE_TIMEOUT = oldClientTimeout;
           TombstoneService.EXPIRED_TOMBSTONE_LIMIT = oldExpiredTombstoneLimit;
           TombstoneService.IDLE_EXPIRATION = oldIdleExpiration;
           TombstoneService.GC_MEMORY_THRESHOLD = oldLimit;
+          TombstoneService.MAX_SLEEP_TIME = oldMaxSleepTime;
         }
       };
       vm0.invoke(resetTimeout);

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
index c92a436..32cedd1 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/GIIDeltaDUnitTest.java
@@ -352,7 +352,7 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
    * create some exception list.
    * Before GII, P's RVV is P6,R6(3-6), R's RVV is P6,R6, RVVGC are both P4,R0   
    * vm1 becomes offline then restarts.
-   * The deltaGII should send delta which only contains unfinished opeation R4,R5  
+   * The deltaGII should send delta which only contains unfinished operation R4,R5  
    */
   @Test
   public void testDeltaGIIWithOnlyUnfinishedOp() throws Throwable {
@@ -2086,7 +2086,7 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
   protected void changeTombstoneTimout(VM vm, final long value) {
     SerializableRunnable change = new SerializableRunnable() {
       public void run() {
-        TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = value;
+        TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = value;
       }
     };
     vm.invoke(change);
@@ -2493,13 +2493,13 @@ public class GIIDeltaDUnitTest extends JUnit4CacheTestCase {
           assertTrue(entry != null && entry.getRegionEntry().isTombstone());
         }
         
-        System.out.println("GGG:new timeout="+TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT);
+        System.out.println("GGG:new timeout="+TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT);
         if (entry == null || !entry.getRegionEntry().isTombstone()) {
           return (false == expectExist);
         } else {
           long ts = entry.getRegionEntry().getVersionStamp().getVersionTimeStamp();
           if (expectExpired) {
-            return (ts + TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT <= ((GemFireCacheImpl)cache).cacheTimeMillis());
// use MAX_WAIT as timeout
+            return (ts + TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT <= ((GemFireCacheImpl)cache).cacheTimeMillis());
// use MAX_WAIT as timeout
           } else {
             return (true == expectExist);
           }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
index bdadd8a..1870a19 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionDelayedRecoveryDUnitTest.java
@@ -191,6 +191,8 @@ public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCa
     //create the region in a third VM, which won't have any buckets
     vm2.invoke(createPrRegions);
 
+    final long begin = System.currentTimeMillis();
+
     //close 1 cache, which should make the bucket drop below
     //the expected redundancy level.
     vm1.invoke(new SerializableRunnable("close cache") {
@@ -200,7 +202,7 @@ public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCa
       }
     });
     
-    long elapsed = waitForBucketRecovery(vm2, 1);
+    long elapsed = waitForBucketRecovery(vm2, 1, begin);
     assertTrue("Did not wait at least 5 seconds to create the bucket. Elapsed=" + elapsed,
elapsed >= 5000);
   }
 
@@ -262,7 +264,7 @@ public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCa
             + elapsed, elapsed < 5000);
     
     //wait for the bucket to be copied
-    elapsed = waitForBucketRecovery(vm2, 4);
+    elapsed = waitForBucketRecovery(vm2, 4, begin);
     assertTrue("Did not wait at least 5 seconds to create the bucket. Elapsed=" + elapsed,
elapsed >= 5000);
     
     vm2.invoke(new SerializableCallable("wait for primary move") {
@@ -280,8 +282,7 @@ public class PartitionedRegionDelayedRecoveryDUnitTest extends JUnit4CacheTestCa
     });
   }
   
-  private long waitForBucketRecovery(VM vm2, final int numBuckets) {
-    final long begin = System.currentTimeMillis();
+  private long waitForBucketRecovery(VM vm2, final int numBuckets, final long begin) {
     //wait for the bucket to be copied
     Long elapsed = (Long) vm2.invoke(new SerializableCallable("putData") {
       public Object call() {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
index 3eeddf5..1851ae3 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/TombstoneCreationJUnitTest.java
@@ -175,7 +175,7 @@ public class TombstoneCreationJUnitTest {
     // the entry
     String key = "destroyedKey1";
     VersionedThinRegionEntryHeap entry = new VersionedThinRegionEntryHeapObjectKey(region,
key, Token.REMOVED_PHASE1);
-    entry.setLastModified(System.currentTimeMillis() - (2 * TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT));
+    entry.setLastModified(System.currentTimeMillis() - (2 * TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT));
     ((AbstractRegionMap)region.getRegionMap()).putEntryIfAbsentForTest(entry);
     cache.getLogger().info("entry inserted into cache: " + entry);
 
@@ -185,7 +185,7 @@ public class TombstoneCreationJUnitTest {
     tag.setIsRemoteForTesting();
     tag.setEntryVersion(3);
     tag.setRegionVersion(12345);
-    tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT);
+    tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT);
     tag.setDistributedSystemId(1);
     ev.setVersionTag(tag);
     cache.getLogger().info("trying to destroy the entry: " + region.getRegionEntry(key));
@@ -209,7 +209,7 @@ public class TombstoneCreationJUnitTest {
     tag.setIsRemoteForTesting();
     tag.setEntryVersion(1);
     tag.setRegionVersion(12340);
-    tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT
- 10000);
+    tag.setVersionTimeStamp(System.currentTimeMillis() - TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT
- 10000);
     tag.setDistributedSystemId(1);
     ev.setVersionTag(tag);
     cache.getLogger().info("trying to update the entry with an older event: " + region.getRegionEntry(key));

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b9da9e66/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
index f7c011d..a7b15fd 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/persistence/PersistentRVVRecoveryDUnitTest.java
@@ -266,9 +266,8 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
       
       @Override
       public void run2() throws CacheException {
-        // TODO Auto-generated method stub
-        long replicatedTombstoneTomeout = TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT;
-        long expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
+        long replicatedTombstoneTomeout = TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT;
+        int expiriredTombstoneLimit = TombstoneService.EXPIRED_TOMBSTONE_LIMIT;
         
         try {
           LocalRegion region = createRegion(vm0);
@@ -303,7 +302,7 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
           // right away when they are gIId based on their original timestamp.
           Wait.pause((int) TEST_REPLICATED_TOMBSTONE_TIMEOUT);
 
-          TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = TEST_REPLICATED_TOMBSTONE_TIMEOUT;
+          TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = TEST_REPLICATED_TOMBSTONE_TIMEOUT;
           TombstoneService.EXPIRED_TOMBSTONE_LIMIT = entryCount;
           // Do region GII
           region = createRegion(vm0);
@@ -335,7 +334,7 @@ public class PersistentRVVRecoveryDUnitTest extends PersistentReplicatedTestBase
 
           cache.close();
         } finally {
-          TombstoneService.REPLICATED_TOMBSTONE_TIMEOUT = replicatedTombstoneTomeout;
+          TombstoneService.REPLICATE_TOMBSTONE_TIMEOUT = replicatedTombstoneTomeout;
           TombstoneService.EXPIRED_TOMBSTONE_LIMIT = expiriredTombstoneLimit;
         }    
       }


Mime
View raw message