geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinmeil...@apache.org
Subject [25/32] incubator-geode git commit: refactored currentTombstone code
Date Tue, 05 Jul 2016 23:15:48 GMT
refactored currentTombstone code


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/0861549c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/0861549c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/0861549c

Branch: refs/heads/feature/GEODE-1571
Commit: 0861549c920f66fd45d3d22f6699318fb14a7572
Parents: 161c80f
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Tue Jun 21 16:31:55 2016 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Tue Jul 5 14:30:09 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/TombstoneService.java        | 137 ++++++++++---------
 1 file changed, 74 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/0861549c/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
index 7036d45..5c6b1dd 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/TombstoneService.java
@@ -547,7 +547,9 @@ public class TombstoneService  implements ResourceListener<MemoryEvent>
{
      */
     volatile boolean batchExpirationSuspended;
     /**
-     * The sweeper thread's current tombstone
+     * The sweeper thread's current tombstone.
+     * Only set by the run() thread while holding the currentTombstoneLock.
+     * Read by other threads while holding the currentTombstoneLock.
      */
     Tombstone currentTombstone;
     /**
@@ -679,13 +681,13 @@ public class TombstoneService  implements ResourceListener<MemoryEvent>
{
       this.batchExpirationInProgress = true;
       boolean batchScheduled = false;
       try {
-        final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
         Set<Tombstone> expired = expiredTombstones;
-        long removalSize = 0;
-        expiredTombstones = new HashSet<Tombstone>();
-        if (expired.size() == 0) {
+        if (expired.isEmpty()) {
           return;
         }
+        expiredTombstones = new HashSet<Tombstone>();
+        final Set<DistributedRegion> regionsAffected = new HashSet<DistributedRegion>();
+        long removalSize = 0;
 
         //Update the GC RVV for all of the affected regions.
         //We need to do this so that we can persist the GC RVV before
@@ -762,10 +764,10 @@ public class TombstoneService  implements ResourceListener<MemoryEvent>
{
     public void run() {
       long minimumRetentionMs = this.expiryTime / 10; // forceExpiration will not work on
something younger than this
       long maximumSleepTime = 10000;
+      Tombstone myTombstone = null;
       if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
         logger.trace(LogMarker.TOMBSTONE, "Destroyed entries sweeper starting with default
sleep interval={}", this.expiryTime);
       }
-      currentTombstone = null;
       // millis we need to run a scan of queue and batch set for resurrected tombstones
       long minimumScanTime = 100;
       // how often to perform the scan
@@ -815,64 +817,50 @@ public class TombstoneService  implements ResourceListener<MemoryEvent>
{
               }
             }
           }
-          if (currentTombstone == null) {
-            try {
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = tombstones.remove();
-              } finally {
-                currentTombstoneLock.unlock();
-              }
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", currentTombstone);
-              }
-            } catch (NoSuchElementException e) {
-              // expected
-              if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
-              }
-              forceExpirationCount = 0;
-            }
+          if (myTombstone == null) {
+            myTombstone = setCurrentToNextTombstone();
           }
-          long sleepTime;
-          if (currentTombstone == null) {
+          long sleepTime = 0;
+          boolean expireMyTombstone = false;
+          if (myTombstone == null) {
             sleepTime = expiryTime;
-          } else if (currentTombstone.getVersionTimeStamp()+expiryTime > now &&
(forceExpirationCount <= 0 || (currentTombstone.getVersionTimeStamp() + expiryTime - now)
<= minimumRetentionMs)) {
-            sleepTime = currentTombstone.getVersionTimeStamp()+expiryTime - now;
           } else {
+            long msTillMyTombstoneExpires = myTombstone.getVersionTimeStamp() + expiryTime
- now;
             if (forceExpirationCount > 0) {
-              forceExpirationCount--;
+              if (msTillMyTombstoneExpires > 0 && msTillMyTombstoneExpires <=
minimumRetentionMs) {
+                sleepTime = msTillMyTombstoneExpires;
+              } else {
+                forceExpirationCount--;
+                expireMyTombstone = true;
+              }
+            } else if (msTillMyTombstoneExpires > 0) {
+              sleepTime = msTillMyTombstoneExpires;
+            } else {
+              expireMyTombstone = true;
             }
-            sleepTime = 0;
+          }
+          if (expireMyTombstone) {
             try {
               if (batchMode) {
                 if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
+                  logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", myTombstone);
                 }
-                expiredTombstones.add(currentTombstone);
+                expiredTombstones.add(myTombstone);
               } else {
                 if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                  logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", currentTombstone);
+                  logger.trace(LogMarker.TOMBSTONE, "removing expired tombstone {}", myTombstone);
                 }
-                queueSize.addAndGet(-currentTombstone.getSize());
-                currentTombstone.region.getRegionMap().removeTombstone(currentTombstone.entry,
currentTombstone, false, true);
-              }
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
+                queueSize.addAndGet(-myTombstone.getSize());
+                myTombstone.region.getRegionMap().removeTombstone(myTombstone.entry, myTombstone,
false, true);
               }
+              myTombstone = null;
+              clearCurrentTombstone();
             } catch (CancelException e) {
               return;
             } catch (Exception e) {
               logger.warn(LocalizedMessage.create(LocalizedStrings.GemFireCacheImpl_TOMBSTONE_ERROR),
e);
-              currentTombstoneLock.lock();
-              try {
-                currentTombstone = null;
-              } finally {
-                currentTombstoneLock.unlock();
-              }
+              myTombstone = null;
+              clearCurrentTombstone();
             }
           }
           if (sleepTime > 0) {
@@ -889,20 +877,16 @@ public class TombstoneService  implements ResourceListener<MemoryEvent>
{
                   if (test.region.getRegionMap().isTombstoneNotNeeded(test.entry, test.getEntryVersion()))
{
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
+                    if (test == myTombstone) {
+                      myTombstone = null;
+                      clearCurrentTombstone();
                       sleepTime = 0;
                     }
-                  } else if (batchMode && test != currentTombstone && (test.getVersionTimeStamp()+expiryTime)
<= now) {
+                  } else if (batchMode && test != myTombstone && (test.getVersionTimeStamp()+expiryTime)
<= now) {
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
                     if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
-                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", currentTombstone);
+                      logger.trace(LogMarker.TOMBSTONE, "expiring tombstone {}", test);
                     }
                     expiredTombstones.add(test);
                     sleepTime = 0;
@@ -919,13 +903,9 @@ public class TombstoneService  implements ResourceListener<MemoryEvent>
{
                     }
                     it.remove();
                     this.queueSize.addAndGet(-test.getSize());
-                    if (test == currentTombstone) {
-                      currentTombstoneLock.lock();
-                      try {
-                        currentTombstone = null;
-                      } finally {
-                        currentTombstoneLock.unlock();
-                      }
+                    if (test == myTombstone) {
+                      myTombstone = null;
+                      clearCurrentTombstone();
                       sleepTime = 0;
                     }
                   }
@@ -980,6 +960,37 @@ public class TombstoneService  implements ResourceListener<MemoryEvent>
{
         }
       } // while()
     } // run()
+
+    private void clearCurrentTombstone() {
+      currentTombstoneLock.lock();
+      currentTombstone = null;
+      currentTombstoneLock.unlock();
+    }
+
+    /**
+     * Returns the new currentTombstone taken from the tombstones queue; null if no next
tombstone
+     */
+    private Tombstone setCurrentToNextTombstone() {
+      Tombstone result;
+      currentTombstoneLock.lock();
+      try {
+        result = tombstones.poll();
+        if (result != null) {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "current tombstone is {}", result);
+          }
+          currentTombstone = result;
+        } else {
+          if (logger.isTraceEnabled(LogMarker.TOMBSTONE)) {
+            logger.trace(LogMarker.TOMBSTONE, "queue is empty - will sleep");
+          }
+          forceExpirationCount = 0;
+        }
+      } finally {
+        currentTombstoneLock.unlock();
+      }
+      return result;
+    }
     
   } // class TombstoneSweeper
 


Mime
View raw message