geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [5/5] incubator-geode git commit: refactored DiskEntry helper code that updates stats
Date Thu, 04 Aug 2016 00:28:39 GMT
refactored DiskEntry helper code that updates stats


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/48ce52aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/48ce52aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/48ce52aa

Branch: refs/heads/feature/GEODE-1714
Commit: 48ce52aa2ca29bd52c13a170733ff8c3dd2e227a
Parents: b93aba1
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Wed Aug 3 17:28:01 2016 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Wed Aug 3 17:28:01 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/DiskEntry.java       | 440 ++++++++++---------
 .../gemstone/gemfire/internal/cache/Token.java  |   3 +
 2 files changed, 225 insertions(+), 218 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/48ce52aa/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
index 78cea8c..45bd340 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
@@ -29,6 +29,7 @@ import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.AsyncDiskEntry;
+import com.gemstone.gemfire.internal.cache.Token.Tombstone;
 import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
 import com.gemstone.gemfire.internal.cache.lru.LRUClockNode;
 import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
@@ -453,8 +454,9 @@ public interface DiskEntry extends RegionEntry {
         else {
           entry.setValueWithContext(drv, entry.prepareValueForCache((RegionEntryContext)
r,
               re.getValue(), false));
-          // TODO: re.getValue() can be INVALID or TOMBSTONE. In those cases inVM should
not be inced.
-          updateStats(drv, r, 1/*InVM*/, 0/*OnDisk*/, 0);
+          if (!Tombstone.isInvalidOrTombstone(re.getValue())) {
+            updateStats(drv, r, 1/*InVM*/, 0/*OnDisk*/, 0);
+          }
         }
       }
       else {
@@ -462,8 +464,7 @@ public interface DiskEntry extends RegionEntry {
         if (did != null) {
           did.setKeyId(DiskRegion.INVALID_ID);
         }
-        // TODO: the value can be INVALID or TOMBSTONE in which case it should not be inVM
-        if (newValue != null && !Token.isRemovedFromDisk(newValue)) {
+        if (newValue != null && !Token.isInvalidOrRemoved(newValue)) {
           updateStats(drv, r, 1/*InVM*/, 0/*OnDisk*/, 0);
         }
       }
@@ -830,12 +831,8 @@ public interface DiskEntry extends RegionEntry {
       //to add an entry to the async queue for every update to maintain the RVV
       boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
       
-      Token oldValue = null;
-      int oldValueLength = 0;
-      boolean scheduleAsync = false;
-      boolean callRemoveFromDisk = false;
+      AsyncDiskEntry asyncDiskEntry = null;
       DiskId did = entry.getDiskId();
-      VersionTag tag = null;
       Object syncObj = did;
       if (syncObj == null) {
         syncObj = entry;
@@ -844,176 +841,192 @@ public interface DiskEntry extends RegionEntry {
         dr.acquireReadLock();
       }
       try {
-      synchronized (syncObj) {
-        oldValue = entry.getValueAsToken();
-        if (Token.isRemovedFromDisk(newValue)) {
-          if (dr.isBackup()) {
-            dr.testIsRecoveredAndClear(did); // fixes bug 41409
-          }
-          RuntimeException rte = null;
-          try {
-            if (!Token.isRemovedFromDisk(oldValue)) {
-              // removeFromDisk takes care of oldValueLength
-              if (dr.isSync()) {
-                removeFromDisk(entry, region, false);
-              } else {
-                callRemoveFromDisk = true; // do it outside the sync
-              }
-            }
-          } catch (RuntimeException e) {
-            rte = e;
-            throw e;
+        synchronized (syncObj) {
+          asyncDiskEntry = basicUpdate(entry, region, newValue, event, dr, maintainRVV, did);
+        }
+      } finally {
+        if (syncObj == did) {
+          dr.releaseReadLock();
+        }
+      }
+      if (asyncDiskEntry != null && did.isPendingAsync()) {
+        // this needs to be done outside the above sync
+        scheduleAsyncWrite(asyncDiskEntry);
+      }
+    }
+
+    private static AsyncDiskEntry basicUpdate(DiskEntry entry, LocalRegion region, Object
newValue, EntryEventImpl event, DiskRegion dr, boolean maintainRVV, DiskId did) throws RegionClearedException
{
+      AsyncDiskEntry result = null;
+      Token oldValue;
+      int oldValueLength;
+      oldValue = entry.getValueAsToken();
+      if (Token.isRemovedFromDisk(newValue)) {
+        if (dr.isBackup()) {
+          dr.testIsRecoveredAndClear(did); // fixes bug 41409
+        }
+        boolean caughtCacheClosed = false;
+        try {
+          if (!Token.isRemovedFromDisk(oldValue)) {
+            result = basicRemoveFromDisk(entry, region, false, dr, maintainRVV, did);
           }
-          finally {
-            if (rte != null && (rte instanceof CacheClosedException)) {
-             // 47616: not to set the value to be removedFromDisk since it failed to persist
-            } else {
-              // Asif Ensure that the value is rightly set despite clear so
-              // that it can be distributed correctly
-              entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already
preparedForCache
-            }
+        } catch (CacheClosedException e) {
+          caughtCacheClosed = true;
+          throw e;
+        } finally {
+          if (caughtCacheClosed) {
+           // 47616: not to set the value to be removedFromDisk since it failed to persist
+          } else {
+            // Asif Ensure that the value is rightly set despite clear so
+            // that it can be distributed correctly
+            entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already
preparedForCache
           }
         }
-        else if (newValue instanceof RecoveredEntry) {
-          // Now that oplog creates are immediately put in cache
-          // a later oplog modify will get us here
-          RecoveredEntry re = (RecoveredEntry)newValue;
-          long oldKeyId = did.getKeyId();
-          long oldOplogId = did.getOplogId();
-          long newOplogId = re.getOplogId();
-          if (newOplogId != oldOplogId) {
-            did.setOplogId(newOplogId);
-            re.setOplogId(oldOplogId); // so caller knows oldoplog id
+      }
+      else if (newValue instanceof RecoveredEntry) {
+        // Now that oplog creates are immediately put in cache
+        // a later oplog modify will get us here
+        RecoveredEntry re = (RecoveredEntry)newValue;
+        long oldKeyId = did.getKeyId();
+        Object oldValueAsToken = entry.getValueAsToken();
+        long oldOplogId = did.getOplogId();
+        long newOplogId = re.getOplogId();
+        if (newOplogId != oldOplogId) {
+          did.setOplogId(newOplogId);
+          re.setOplogId(oldOplogId); // so caller knows oldoplog id
+        }
+        did.setOffsetInOplog(re.getOffsetInOplog());
+        // id already set
+        did.setUserBits(re.getUserBits());
+        oldValueLength = did.getValueLength();
+        did.setValueLength(re.getValueLength());
+        
+        if (re.getRecoveredKeyId() < 0) {
+          if (!entry.isValueNull()) {
+            entry.handleValueOverflow(region);
+            entry.setValueWithContext(region, null); // fixes bug 41119
           }
-          did.setOffsetInOplog(re.getOffsetInOplog());
-          // id already set
-          did.setUserBits(re.getUserBits());
-          oldValueLength = did.getValueLength();
-          did.setValueLength(re.getValueLength());
-          
-          if (re.getRecoveredKeyId() < 0) {
-            if (!entry.isValueNull()) {
-              entry.handleValueOverflow(region);
-              entry.setValueWithContext(region, null); // fixes bug 41119
+        } else {
+          entry.setValueWithContext(region, entry.prepareValueForCache(region, re.getValue(),
false));
+        }
+        
+        if (re.getRecoveredKeyId() < 0) { // recovering an entry whose new value is on
disk
+          if (oldKeyId >= 0) { // the entry's old value is in vm
+            // TODO: oldKeyId == 0 is the ILLEGAL id; what does that indicate?
+            int inVM = -1;
+            if (Token.isInvalidOrTombstone(oldValueAsToken)) { // but tokens are never in
vm
+              inVM = 0;
             }
-          } else {
-            entry.setValueWithContext(region, entry.prepareValueForCache(region, re.getValue(),
false));
+            updateStats(dr, region, inVM, 1/*OnDisk*/, did.getValueLength());
+          } else { // the entry's old value is also on disk
+            int valueLenDelta = -oldValueLength; // but it is no longer
+            valueLenDelta += did.getValueLength(); // new one is now on disk
+            updateStats(dr, region, 0, 0, valueLenDelta);
           }
-          
-          if (re.getRecoveredKeyId() < 0) {
-            if(oldKeyId >= 0) {
-              // TODO: oldKeyId == 0 is the ILLEGAL id
-              // TODO: if oldValue is INVALID or TOMBSTONE then it was not inVM
-              updateStats(dr, region, -1/*InVM*/, 1/*OnDisk*/, did.getValueLength());
-            }
-          } else {
-            // TODO could be INVALID or a TOMBSTONE in which case should not be inVM or onDisk
-            if(oldKeyId < 0) {
-              updateStats(dr, region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+        } else { // recovering an entry whose new value is in vm
+          int inVM = 1;
+          if (Token.isInvalidOrTombstone(re.getValue())) { // but tokens never in vm
+            inVM = 0;
+          }
+          if(oldKeyId < 0) { // the entry's old value is on disk
+            updateStats(dr, region, inVM, -1/*OnDisk*/, -oldValueLength);
+          } else { // the entry's old value was in the vm
+            if (inVM == 1 && Token.isInvalidOrTombstone(oldValueAsToken)) {
+              // the old state was not in vm and not on disk. But now we are in vm.
+              updateStats(dr, region, 1, 0, 0);
+            } else if (inVM == 0 && !Token.isInvalidOrTombstone(oldValueAsToken))
{
+              // the old state was in vm and not on disk. But now we are not in vm.
+              updateStats(dr, region, -1, 0, 0);
             }
           }
         }
-        else {
-          //The new value in the entry needs to be set after the disk writing 
-          // has succeeded.
-          
-          //entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-          
-          if(did != null && did.isPendingAsync()) {
-            //if the entry was not yet written to disk, we didn't update
-            //the bytes on disk.
-            oldValueLength = 0;
-          } else {
-            oldValueLength = getValueLength(did);
-          }
-          
-          if (dr.isBackup()) {
-            dr.testIsRecoveredAndClear(did); // fixes bug 41409
-            if (dr.isSync()) {
-              //In case of compression the value is being set first
-              if (AbstractRegionEntry.isCompressible(dr, newValue)) {
-                entry.setValueWithContext(region, newValue); // OFFHEAP newValue already
prepared
-                
-                // newValue is prepared and compressed. We can't write compressed values
to disk.
-                writeToDisk(entry, region, false, event);
-              } else {
-                writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
-                entry.setValueWithContext(region, newValue); // OFFHEAP newValue already
prepared
-              }
-              
-            } else if (did.isPendingAsync() && !maintainRVV) {
+      }
+      else {
+        //The new value in the entry needs to be set after the disk writing 
+        // has succeeded.
+        
+        //entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+        
+        if(did != null && did.isPendingAsync()) {
+          //if the entry was not yet written to disk, we didn't update
+          //the bytes on disk.
+          oldValueLength = 0;
+        } else {
+          oldValueLength = getValueLength(did);
+        }
+        
+        if (dr.isBackup()) {
+          dr.testIsRecoveredAndClear(did); // fixes bug 41409
+          if (dr.isSync()) {
+            //In case of compression the value is being set first
+            if (AbstractRegionEntry.isCompressible(dr, newValue)) {
               entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
               
-              // nothing needs to be done except
-              // fixing up LRU stats
-              // @todo fixup LRU stats if needed
-              // I'm not sure anything needs to be done here.
-              // If we have overflow and it decided to evict this entry
-              // how do we handle that case when we are async?
-              // Seems like the eviction code needs to leave the value
-              // in memory until the pendingAsync is done.
+              // newValue is prepared and compressed. We can't write compressed values to
disk.
+              writeToDisk(entry, region, false, event);
             } else {
-              //if the entry is not async, we need to schedule it
-              //for regions with concurrency checks enabled, we add an entry
-              //to the queue for every entry.
-              scheduleAsync = true;
-              did.setPendingAsync(true);
-              VersionStamp stamp = entry.getVersionStamp();
-              if(stamp != null) {
-                tag = stamp.asVersionTag();
-              }
-              entry.setValueWithContext(region, newValue); 
+              writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
+              entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
             }
-          } else if (did != null) {
+            
+          } else if (did.isPendingAsync() && !maintainRVV) {
             entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
             
-            // Mark the id as needing to be written
-            // The disk remove that this section used to do caused bug 30961
-            // @todo this seems wrong. How does leaving it on disk fix the bug?
-            did.markForWriting();
-            //did.setValueSerializedSize(0);
-          }else {
-            entry.setValueWithContext(region, newValue);
+            // nothing needs to be done except
+            // fixing up LRU stats
+            // @todo fixup LRU stats if needed
+            // I'm not sure anything needs to be done here.
+            // If we have overflow and it decided to evict this entry
+            // how do we handle that case when we are async?
+            // Seems like the eviction code needs to leave the value
+            // in memory until the pendingAsync is done.
+          } else {
+            //if the entry is not async, we need to schedule it
+            //for regions with concurrency checks enabled, we add an entry
+            //to the queue for every entry.
+            did.setPendingAsync(true);
+            VersionTag tag = null;
+            VersionStamp stamp = entry.getVersionStamp();
+            if(stamp != null) {
+              tag = stamp.asVersionTag();
+            }
+            result = new AsyncDiskEntry(region, entry, tag);
+            entry.setValueWithContext(region, newValue); 
           }
+        } else if (did != null) {
+          entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
           
-          if (Token.isRemovedFromDisk(oldValue)) {
-            // Note we now initialize entries removed and then set their
-            // value once we find no existing entry.
-            // So this is the normal path for a brand new entry.
-            updateStats(dr, region, 1/*InVM*/, 0/*OnDisk*/, 0);
+          // Mark the id as needing to be written
+          // The disk remove that this section used to do caused bug 30961
+          // @todo this seems wrong. How does leaving it on disk fix the bug?
+          did.markForWriting();
+          //did.setValueSerializedSize(0);
+        }else {
+          entry.setValueWithContext(region, newValue);
+        }
+        
+        if (Token.isInvalidOrRemoved(newValue)) {
+          if (oldValue == null) {
+            updateStats(dr, region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+          } else if (!Token.isInvalidOrRemoved(oldValue)){
+            updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
+          } else {
+            // oldValue was also a token which is neither in vm or on disk.
           }
-          
-          if(newValue == Token.TOMBSTONE) {
-            if (oldValue == null) {
-              updateStats(dr, region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
-            } else {
-              updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
-            }
+        } else { // we have a value to put in the vm
+          if (oldValue == null) {
+            updateStats(dr, region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+          } else if (Token.isInvalidOrRemoved(oldValue)) {
+            updateStats(dr, region, 1/*InVM*/, 0/*OnDisk*/, 0/*overflowBytesOnDisk*/);
           } else {
-            if (oldValue == null) {
-              updateStats(dr, region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength);
-            } else if(oldValue == Token.TOMBSTONE) {
-              updateStats(dr, region, 1/*InVM*/, 0/*OnDisk*/, 0/*overflowBytesOnDisk*/);
-            }
+            // old value was also in vm so no change
           }
         }
-        if (entry instanceof LRUEntry) {
-          LRUEntry le = (LRUEntry)entry;
-          le.unsetEvicted();          
-        }
-
-      }
-      } finally {
-        if (syncObj == did) {
-          dr.releaseReadLock();
-        }
       }
-      if (callRemoveFromDisk) {
-        removeFromDisk(entry, region, false, oldValue == null, false);
-      } else if (scheduleAsync && did.isPendingAsync()) {
-        // this needs to be done outside the above sync
-        scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag));
+      if (entry instanceof LRUEntry) {
+        LRUEntry le = (LRUEntry)entry;
+        le.unsetEvicted();          
       }
+      return result;
     }
 
     private static int getValueLength(DiskId did) {
@@ -1337,7 +1350,9 @@ public interface DiskEntry extends RegionEntry {
       // a regression with it. Reenable this post 6.5
       //Assert.assertTrue(entry._getValue() == null);
       entry.setValueWithContext((RegionEntryContext) region, preparedValue);
-      updateStats(dr, region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk);
+      if (!Token.isInvalidOrTombstone(value)) {
+        updateStats(dr, region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk);
+      }
       return preparedValue;
     }
 
@@ -1527,8 +1542,6 @@ public interface DiskEntry extends RegionEntry {
             try {
               if (Token.isRemovedFromDisk(entryVal)) {
                 if (region.isThisRegionBeingClosedOrDestroyed()) return;
-                // onDisk was already deced so just do the valueLength here
-                updateStats(dr, region, 0/*InVM*/, 0/*OnDisk*/, -did.getValueLength());
                 dr.remove(region, entry, true, false);
                 if (dr.isBackup()) {
                   did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
@@ -1597,10 +1610,6 @@ public interface DiskEntry extends RegionEntry {
      * @see DiskRegion#remove
      */
     public static void removeFromDisk(DiskEntry entry, LocalRegion region, boolean isClear)
throws RegionClearedException {
-      removeFromDisk(entry, region, true, false, isClear);
-    }
-    private static void removeFromDisk(DiskEntry entry, LocalRegion region,
-                                      boolean checkValue, boolean valueWasNull, boolean isClear)
throws RegionClearedException {
       DiskRegion dr = region.getDiskRegion();
       
       //If we have concurrency checks enabled for a persistent region, we need
@@ -1608,77 +1617,72 @@ public interface DiskEntry extends RegionEntry {
       boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
       
       DiskId did = entry.getDiskId();
-      VersionTag tag = null;
       Object syncObj = did;
       if (did == null) {
         syncObj = entry;
       }
-      boolean scheduledAsyncHere = false;
+      AsyncDiskEntry asyncDiskEntry = null;
       if (syncObj == did) {
         dr.acquireReadLock();
       }
       try {
-      synchronized (syncObj) { 
-
-        if (did == null || (dr.isBackup() && did.getKeyId()== DiskRegion.INVALID_ID))
{
-          // Not on disk yet
-          // TODO: check if old value is TOMBSTONE or INVALID?
-          updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
-          dr.unscheduleAsyncWrite(did);
-          return;
-        } 
-        //Asif: This will convert the -ve OplogKeyId to positive as part of fixing
-        //Bug # 39989
-        did.unmarkForWriting();
-
-        //System.out.println("DEBUG: removeFromDisk doing remove(" + id + ")");
-        int oldValueLength = 0;
-        if (dr.isSync() || isClear) {
-          oldValueLength = did.getValueLength();
-          dr.remove(region, entry, false, isClear);
-          if (dr.isBackup()) {
-            did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
-          }
-          //If this is a clear, we should unschedule the async write for this
-          //entry
-          did.setPendingAsync(false);
-        } else {
-          if (!did.isPendingAsync() || maintainRVV) {
-            scheduledAsyncHere = true;
-            did.setPendingAsync(true);
-            VersionStamp stamp = entry.getVersionStamp();
-            if(stamp != null) {
-              tag = stamp.asVersionTag();
-            }
-          }
-        }
-        if (checkValue) {
-          valueWasNull = entry.isValueNull();
-          entry._removePhase1();
-        }
-        if (valueWasNull) {
-          updateStats(dr, region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
-        }
-        else {
-          // TODO: if we dec inVM when INVALID/TOMBSTONE then no need to dec it here if oldValue
is INVALID/TOMBSTONE
-          updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
-          if (!dr.isSync()) {
-            // we are going to do an async remove of an entry that is not currently
-            // overflowed to disk so we don't want to count its value length as being
-            // on disk when we finally do the async op. So we clear it here.
-            did.setValueLength(0);
-          }
+        synchronized (syncObj) {
+          asyncDiskEntry = basicRemoveFromDisk(entry, region, isClear, dr, maintainRVV, did);
         }
-      }
       } finally {
         if (syncObj == did) {
           dr.releaseReadLock();
         }
       }
-      if (scheduledAsyncHere && did.isPendingAsync()) {
+      if (asyncDiskEntry != null && did.isPendingAsync()) {
         // do this outside the sync
-        scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag));
+        scheduleAsyncWrite(asyncDiskEntry);
+      }
+    }
+
+    private static AsyncDiskEntry basicRemoveFromDisk(DiskEntry entry, LocalRegion region,
boolean isClear, DiskRegion dr, boolean maintainRVV, DiskId did) throws RegionClearedException
{
+      Object curValAsToken = entry.getValueAsToken();
+      if (did == null || (dr.isBackup() && did.getKeyId()== DiskRegion.INVALID_ID))
{
+        // Not on disk yet
+        if (!Token.isInvalidOrRemoved(curValAsToken)) {
+          updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
+        }
+        dr.unscheduleAsyncWrite(did);
+        return null;
+      }
+      AsyncDiskEntry result = null;
+      
+      //Asif: This will convert the -ve OplogKeyId to positive as part of fixing
+      //Bug # 39989
+      did.unmarkForWriting();
+
+      //System.out.println("DEBUG: removeFromDisk doing remove(" + id + ")");
+      int oldValueLength = did.getValueLength();
+      if (dr.isSync() || isClear) {
+        dr.remove(region, entry, false, isClear);
+        if (dr.isBackup()) {
+          did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
+        }
+        //If this is a clear, we should unschedule the async write for this
+        //entry
+        did.setPendingAsync(false);
+      } else {
+        if (!did.isPendingAsync() || maintainRVV) {
+          did.setPendingAsync(true);
+          VersionTag tag = null;
+          VersionStamp stamp = entry.getVersionStamp();
+          if(stamp != null) {
+            tag = stamp.asVersionTag();
+          }
+          result = new AsyncDiskEntry(region, entry, tag);
+        }
       }
+      if (curValAsToken == null) {
+        updateStats(dr, region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+      } else if (!Token.isInvalidOrRemoved(curValAsToken)) {
+        updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
+      }
+      return result;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/48ce52aa/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java
index 0f731e0..43cbfd7 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/Token.java
@@ -99,6 +99,9 @@ public abstract class Token {
   public static final boolean isInvalidOrRemoved(Object o) {
     return isInvalid(o) || isRemoved(o);
   }
+  public static final boolean isInvalidOrTombstone(Object o) {
+    return isInvalid(o) || o == TOMBSTONE;
+  }
   public static final boolean isInvalid(Object o) {
     return o == INVALID || o == LOCAL_INVALID;
   }


Mime
View raw message