geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject [18/24] incubator-geode git commit: GEODE-1714: fix disk stats
Date Tue, 16 Aug 2016 21:34:06 GMT
GEODE-1714: fix disk stats

A number of places were found that did not correctly
change the DiskRegionStats: entriesInVM and entriesOnlyOnDisk.
Also found places that did not correctly change
the entry count and size of a PR bucket region.
EntriesInVM is supposed to only count entries that have a value
stored in the JVM. But it was also counting invalid entries.
Since invalid entries do not have a value this stat no longer counts them.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/ed85e8eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/ed85e8eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/ed85e8eb

Branch: refs/heads/feature/GEODE-1691
Commit: ed85e8ebaea20a6505548c3745a6fb8434310c4b
Parents: a1d2fa2
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Thu Jul 28 11:55:11 2016 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Mon Aug 15 16:33:53 2016 -0700

----------------------------------------------------------------------
 .../internal/cache/AbstractLRURegionMap.java    |  11 +-
 .../internal/cache/AbstractRegionMap.java       |  34 +-
 .../gemfire/internal/cache/DiskEntry.java       | 583 +++++++++----------
 .../gemfire/internal/cache/LocalRegion.java     |  20 +
 .../gemfire/cache30/DiskRegionDUnitTest.java    |  31 +-
 .../cache/DiskRegRecoveryJUnitTest.java         |  33 +-
 6 files changed, 373 insertions(+), 339 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed85e8eb/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
index c9b3038..af9a389 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractLRURegionMap.java
@@ -293,11 +293,18 @@ public abstract class AbstractLRURegionMap extends AbstractRegionMap {
         }
 
         // Do the following check while synchronized to fix bug 31761
-        if (entry.isInvalidOrRemoved()) {
+        Token entryVal = entry.getValueAsToken();
+        if (entryVal == null) {
+          if (logger.isTraceEnabled(LogMarker.LRU)) {
+            logger.trace(LogMarker.LRU, "no need to evict already evicted key={}", entry.getKey());
+          }
+          return 0;
+        }
+        if (Token.isInvalidOrRemoved(entryVal)) {
           // no need to evict these; it will not save any space
           // and the destroyed token needs to stay in memory
           if (logger.isTraceEnabled(LogMarker.LRU)) {
-            logger.trace(LogMarker.LRU, "no need to evict invalid/localInvalid/destroyed token for key={}", entry.getKey());
+            logger.trace(LogMarker.LRU, "no need to evict {} token for key={}", entryVal, entry.getKey());
           }
           return 0;
         }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed85e8eb/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
index 226e194..faa8580 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/AbstractRegionMap.java
@@ -565,13 +565,19 @@ public abstract class AbstractRegionMap implements RegionMap {
             continue;
           }
           RegionEntry newRe = getEntryFactory().createEntry((RegionEntryContext) _getOwnerObject(), key, value);
+          // TODO: passing value to createEntry causes a problem with the disk stats.
+          //   The disk stats have already been set to track oldRe.
+          //   So when we call createEntry we probably want to give it REMOVED_PHASE1
+          //   and then set the value in copyRecoveredEntry it a way that does not
+          //   change the disk stats. This also depends on DiskEntry.Helper.initialize not changing the stats for REMOVED_PHASE1
           copyRecoveredEntry(oldRe, newRe);
           // newRe is now in this._getMap().
           if (newRe.isTombstone()) {
             VersionTag tag = newRe.getVersionStamp().asVersionTag();
             tombstones.put(tag, newRe);
+          } else {
+            _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
           }
-          _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
           incEntryCount(1);
           lruEntryUpdate(newRe);
         } finally {
@@ -583,20 +589,20 @@ public abstract class AbstractRegionMap implements RegionMap {
         lruUpdateCallback();
       }
     } else {
-      incEntryCount(size());
       for (Iterator<RegionEntry> iter = regionEntries().iterator(); iter.hasNext(); ) {
         RegionEntry re = iter.next();
         if (re.isTombstone()) {
           if (re.getVersionStamp() == null) { // bug #50992 - recovery from versioned to non-versioned
-            incEntryCount(-1);
             iter.remove();
             continue;
           } else {
             tombstones.put(re.getVersionStamp().asVersionTag(), re);
           }
+        } else {
+          _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateRegionEntryValueSize(re));
         }
-        _getOwner().updateSizeOnCreate(re.getKey(), _getOwner().calculateRegionEntryValueSize(re));
       }
+      incEntryCount(size());
       // Since lru was not being done during recovery call it now.
       lruUpdateCallback();
     }
@@ -657,12 +663,13 @@ public abstract class AbstractRegionMap implements RegionMap {
         } // synchronized
       }
       if (_isOwnerALocalRegion()) {
-        _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
         if (newRe.isTombstone()) {
           // refresh the tombstone so it doesn't time out too soon
           _getOwner().scheduleTombstone(newRe, newRe.getVersionStamp().asVersionTag());
+        } else {
+          _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(newRe));
         }
-        
+        // incEntryCount is called for a tombstone because scheduleTombstone does entryCount--.
         incEntryCount(1); // we are creating an entry that was recovered from disk including tombstone
       }
       lruEntryUpdate(newRe);
@@ -692,16 +699,25 @@ public abstract class AbstractRegionMap implements RegionMap {
       }
       try {
         if (_isOwnerALocalRegion()) {
-          if (re.isTombstone()) {
+          boolean oldValueWasTombstone = re.isTombstone();
+          if (oldValueWasTombstone) {
             // when a tombstone is to be overwritten, unschedule it first
             _getOwner().unscheduleTombstone(re);
+            // unscheduleTombstone incs entryCount which is ok
+            // because we either set the value after this so that
+            // the entry exists or we call scheduleTombstone which
+            // will dec entryCount.
           }
           final int oldSize = _getOwner().calculateRegionEntryValueSize(re);
           re.setValue(_getOwner(), value); // OFFHEAP no need to call AbstractRegionMap.prepareValueForCache because setValue is overridden for disk and that code takes apart value (RecoveredEntry) and prepares its nested value for the cache
           if (re.isTombstone()) {
             _getOwner().scheduleTombstone(re, re.getVersionStamp().asVersionTag());
+            _getOwner().updateSizeOnRemove(key, oldSize);
+          } else if (oldValueWasTombstone) {
+            _getOwner().updateSizeOnCreate(key, _getOwner().calculateRegionEntryValueSize(re));
+          } else {
+            _getOwner().updateSizeOnPut(key, oldSize, _getOwner().calculateRegionEntryValueSize(re));
           }
-          _getOwner().updateSizeOnPut(key, oldSize, _getOwner().calculateRegionEntryValueSize(re));
         } else {
           DiskEntry.Helper.updateRecoveredEntry((PlaceHolderDiskRegion)_getOwnerObject(),
               (DiskEntry)re, value, (RegionEntryContext) _getOwnerObject());
@@ -867,6 +883,7 @@ public abstract class AbstractRegionMap implements RegionMap {
                       }
                     }
                     if (newValue == Token.TOMBSTONE) {
+                      owner.updateSizeOnRemove(key, oldSize);
                       if (owner.getServerProxy() == null &&
                           owner.getVersionVector().isTombstoneTooOld(entryVersion.getMemberID(), entryVersion.getRegionVersion())) {
                         // the received tombstone has already been reaped, so don't retain it
@@ -1628,6 +1645,7 @@ public abstract class AbstractRegionMap implements RegionMap {
         // generate versions and Tombstones for destroys
         boolean dispatchListenerEvent = inTokenMode;
         boolean opCompleted = false;
+        // TODO: if inTokenMode then Token.DESTROYED is ok but what about !inTokenMode because owner.concurrencyChecksEnabled? In that case we do not want a DESTROYED token.
         RegionEntry newRe = getEntryFactory().createEntry(owner, key,
             Token.DESTROYED);
         if ( oqlIndexManager != null) {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed85e8eb/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
index 98ee729..144ffe4 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
@@ -29,6 +29,7 @@ import com.gemstone.gemfire.internal.ByteArrayDataInput;
 import com.gemstone.gemfire.internal.HeapDataOutputStream;
 import com.gemstone.gemfire.internal.Version;
 import com.gemstone.gemfire.internal.cache.DiskStoreImpl.AsyncDiskEntry;
+import com.gemstone.gemfire.internal.cache.Token.Tombstone;
 import com.gemstone.gemfire.internal.cache.lru.EnableLRU;
 import com.gemstone.gemfire.internal.cache.lru.LRUClockNode;
 import com.gemstone.gemfire.internal.cache.lru.LRUEntry;
@@ -438,14 +439,7 @@ public interface DiskEntry extends RegionEntry {
         throw new IllegalArgumentException(LocalizedStrings.DiskEntry_DISK_REGION_IS_NULL.toLocalizedString());
       }
 
-      if (newValue == null || Token.isRemovedFromDisk(newValue)) {
-        // it is not in vm and it is not on disk
-        DiskId did = entry.getDiskId();
-        if (did != null) {
-          did.setKeyId(DiskRegion.INVALID_ID);
-        }
-      }
-      else if (newValue instanceof RecoveredEntry) {
+      if (newValue instanceof RecoveredEntry) {
         // Set the id directly, the value will also be set if RECOVER_VALUES
         RecoveredEntry re = (RecoveredEntry)newValue;
         DiskId did = entry.getDiskId();
@@ -455,15 +449,14 @@ public interface DiskEntry extends RegionEntry {
         did.setUserBits(re.getUserBits());
         did.setValueLength(re.getValueLength());
         if (re.getRecoveredKeyId() < 0) {
-          drv.incNumOverflowOnDisk(1L);
-          drv.incNumOverflowBytesOnDisk(did.getValueLength());
-          incrementBucketStats(r, 0/*InVM*/, 1/*OnDisk*/, did.getValueLength());
+          updateStats(drv, r, 0/*InVM*/, 1/*OnDisk*/, did.getValueLength());
         }
         else {
           entry.setValueWithContext(drv, entry.prepareValueForCache((RegionEntryContext) r,
               re.getValue(), false));
-          drv.incNumEntriesInVM(1L);
-          incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0);
+          if (!Tombstone.isInvalidOrRemoved(re.getValue())) {
+            updateStats(drv, r, 1/*InVM*/, 0/*OnDisk*/, 0);
+          }
         }
       }
       else {
@@ -471,8 +464,9 @@ public interface DiskEntry extends RegionEntry {
         if (did != null) {
           did.setKeyId(DiskRegion.INVALID_ID);
         }
-        drv.incNumEntriesInVM(1L);
-        incrementBucketStats(r, 1/*InVM*/, 0/*OnDisk*/, 0);
+        if (newValue != null && !Token.isInvalidOrRemoved(newValue)) {
+          updateStats(drv, r, 1/*InVM*/, 0/*OnDisk*/, 0);
+        }
       }
     }
     
@@ -828,21 +822,13 @@ public interface DiskEntry extends RegionEntry {
      * @throws RegionClearedException
      */
     public static void update(DiskEntry entry, LocalRegion region, Object newValue, EntryEventImpl event) throws RegionClearedException {
-      DiskRegion dr = region.getDiskRegion();
       if (newValue == null) {
         throw new NullPointerException(LocalizedStrings.DiskEntry_ENTRYS_VALUE_SHOULD_NOT_BE_NULL.toLocalizedString());
       }
       
-      //If we have concurrency checks enabled for a persistent region, we need
-      //to add an entry to the async queue for every update to maintain the RVV
-      boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
-      
-      Token oldValue = null;
-      int oldValueLength = 0;
-      boolean scheduleAsync = false;
-      boolean callRemoveFromDisk = false;
+      AsyncDiskEntry asyncDiskEntry = null;
+      DiskRegion dr = region.getDiskRegion();
       DiskId did = entry.getDiskId();
-      VersionTag tag = null;
       Object syncObj = did;
       if (syncObj == null) {
         syncObj = entry;
@@ -851,188 +837,193 @@ public interface DiskEntry extends RegionEntry {
         dr.acquireReadLock();
       }
       try {
-      synchronized (syncObj) {
-        oldValue = entry.getValueAsToken();
-        if (Token.isRemovedFromDisk(newValue)) {
-          if (dr.isBackup()) {
-            dr.testIsRecoveredAndClear(did); // fixes bug 41409
-          }
-          RuntimeException rte = null;
-          try {
-            if (!Token.isRemovedFromDisk(oldValue)) {
-              // removeFromDisk takes care of oldValueLength
-              if (dr.isSync()) {
-                removeFromDisk(entry, region, false);
-              } else {
-                callRemoveFromDisk = true; // do it outside the sync
-              }
-            }
-          } catch (RuntimeException e) {
-            rte = e;
-            throw e;
+        synchronized (syncObj) {
+          asyncDiskEntry = basicUpdate(entry, region, newValue, event);
+        }
+      } finally {
+        if (syncObj == did) {
+          dr.releaseReadLock();
+        }
+      }
+      if (asyncDiskEntry != null && did.isPendingAsync()) {
+        // this needs to be done outside the above sync
+        scheduleAsyncWrite(asyncDiskEntry);
+      }
+    }
+
+    private static AsyncDiskEntry basicUpdate(DiskEntry entry, LocalRegion region, Object newValue, EntryEventImpl event) throws RegionClearedException {
+      AsyncDiskEntry result = null;
+      DiskRegion dr = region.getDiskRegion();
+      DiskId did = entry.getDiskId();
+      Token oldValue;
+      int oldValueLength;
+      oldValue = entry.getValueAsToken();
+      if (Token.isRemovedFromDisk(newValue)) {
+        if (dr.isBackup()) {
+          dr.testIsRecoveredAndClear(did); // fixes bug 41409
+        }
+        boolean caughtCacheClosed = false;
+        try {
+          if (!Token.isRemovedFromDisk(oldValue)) {
+            result = basicRemoveFromDisk(entry, region, false);
           }
-          finally {
-            if (rte != null && (rte instanceof CacheClosedException)) {
-             // 47616: not to set the value to be removedFromDisk since it failed to persist
-            } else {
-              // Asif Ensure that the value is rightly set despite clear so
-              // that it can be distributed correctly
-              entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already preparedForCache
-            }
+        } catch (CacheClosedException e) {
+          caughtCacheClosed = true;
+          throw e;
+        } finally {
+          if (caughtCacheClosed) {
+           // 47616: not to set the value to be removedFromDisk since it failed to persist
+          } else {
+            // Asif Ensure that the value is rightly set despite clear so
+            // that it can be distributed correctly
+            entry.setValueWithContext(region, newValue); // OFFHEAP newValue was already preparedForCache
           }
         }
-        else if (newValue instanceof RecoveredEntry) {
-          // Now that oplog creates are immediately put in cache
-          // a later oplog modify will get us here
-          RecoveredEntry re = (RecoveredEntry)newValue;
-          long oldKeyId = did.getKeyId();
-          long oldOplogId = did.getOplogId();
-          long newOplogId = re.getOplogId();
-          if (newOplogId != oldOplogId) {
-            did.setOplogId(newOplogId);
-            re.setOplogId(oldOplogId); // so caller knows oldoplog id
+      }
+      else if (newValue instanceof RecoveredEntry) {
+        // Now that oplog creates are immediately put in cache
+        // a later oplog modify will get us here
+        RecoveredEntry re = (RecoveredEntry)newValue;
+        long oldKeyId = did.getKeyId();
+        Object oldValueAsToken = entry.getValueAsToken();
+        long oldOplogId = did.getOplogId();
+        long newOplogId = re.getOplogId();
+        if (newOplogId != oldOplogId) {
+          did.setOplogId(newOplogId);
+          re.setOplogId(oldOplogId); // so caller knows oldoplog id
+        }
+        did.setOffsetInOplog(re.getOffsetInOplog());
+        // id already set
+        did.setUserBits(re.getUserBits());
+        oldValueLength = did.getValueLength();
+        did.setValueLength(re.getValueLength());
+        
+        if (re.getRecoveredKeyId() < 0) {
+          if (!entry.isValueNull()) {
+            entry.handleValueOverflow(region);
+            entry.setValueWithContext(region, null); // fixes bug 41119
           }
-          did.setOffsetInOplog(re.getOffsetInOplog());
-          // id already set
-          did.setUserBits(re.getUserBits());
-          oldValueLength = did.getValueLength();
-          did.setValueLength(re.getValueLength());
-          
-          if (re.getRecoveredKeyId() < 0) {
-            if (!entry.isValueNull()) {
-              entry.handleValueOverflow(region);
-              entry.setValueWithContext(region, null); // fixes bug 41119
+        } else {
+          entry.setValueWithContext(region, entry.prepareValueForCache(region, re.getValue(), false));
+        }
+        
+        if (re.getRecoveredKeyId() < 0) { // recovering an entry whose new value is on disk
+          if (oldKeyId >= 0) { // the entry's old value is in vm
+            // TODO: oldKeyId == 0 is the ILLEGAL id; what does that indicate?
+            int inVM = -1;
+            if (Token.isInvalidOrRemoved(oldValueAsToken)) { // but tokens are never in vm
+              inVM = 0;
             }
-          } else {
-            entry.setValueWithContext(region, entry.prepareValueForCache(region, re.getValue(), false));
+            updateStats(dr, region, inVM, 1/*OnDisk*/, did.getValueLength());
+          } else { // the entry's old value is also on disk
+            int valueLenDelta = -oldValueLength; // but it is no longer
+            valueLenDelta += did.getValueLength(); // new one is now on disk
+            updateStats(dr, region, 0, 0, valueLenDelta);
           }
-          
-          if (re.getRecoveredKeyId() < 0) {
-            if(oldKeyId >= 0) {
-              dr.incNumEntriesInVM(-1L);
-              dr.incNumOverflowOnDisk(1L);
-              dr.incNumOverflowBytesOnDisk(did.getValueLength());
-              incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/,
-                                   did.getValueLength());
-            }
-          } else {
-            if(oldKeyId < 0) {
-              dr.incNumEntriesInVM(1L);
-              dr.incNumOverflowOnDisk(-1L);
-              dr.incNumOverflowBytesOnDisk(-oldValueLength);
-              incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+        } else { // recovering an entry whose new value is in vm
+          int inVM = 1;
+          if (Token.isInvalidOrRemoved(re.getValue())) { // but tokens never in vm
+            inVM = 0;
+          }
+          if(oldKeyId < 0) { // the entry's old value is on disk
+            updateStats(dr, region, inVM, -1/*OnDisk*/, -oldValueLength);
+          } else { // the entry's old value was in the vm
+            if (inVM == 1 && Token.isInvalidOrRemoved(oldValueAsToken)) {
+              // the old state was not in vm and not on disk. But now we are in vm.
+              updateStats(dr, region, 1, 0, 0);
+            } else if (inVM == 0 && !Token.isInvalidOrRemoved(oldValueAsToken)) {
+              // the old state was in vm and not on disk. But now we are not in vm.
+              updateStats(dr, region, -1, 0, 0);
             }
           }
         }
-        else {
-          //The new value in the entry needs to be set after the disk writing 
-          // has succeeded.
-          
-          //entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-          
-          if(did != null && did.isPendingAsync()) {
-            //if the entry was not yet written to disk, we didn't update
-            //the bytes on disk.
-            oldValueLength = 0;
-          } else {
-            oldValueLength = getValueLength(did);
-          }
-          
-          if (dr.isBackup()) {
-            dr.testIsRecoveredAndClear(did); // fixes bug 41409
-            if (dr.isSync()) {
+      }
+      else {
+        //The new value in the entry needs to be set after the disk writing 
+        // has succeeded.
+        
+        //entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+        
+        if(did != null && did.isPendingAsync()) {
+          //if the entry was not yet written to disk, we didn't update
+          //the bytes on disk.
+          oldValueLength = 0;
+        } else {
+          oldValueLength = getValueLength(did);
+        }
+        
+        if (dr.isBackup()) {
+          dr.testIsRecoveredAndClear(did); // fixes bug 41409
+          if (dr.isSync()) {
+            if (AbstractRegionEntry.isCompressible(dr, newValue)) {
               //In case of compression the value is being set first
-              if (AbstractRegionEntry.isCompressible(dr, newValue)) {
-                entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-                
-                // newValue is prepared and compressed. We can't write compressed values to disk.
-                writeToDisk(entry, region, false, event);
-              } else {
-                writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
-                entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-              }
-              
-            } else if (did.isPendingAsync() && !maintainRVV) {
+              //so that writeToDisk can get it back from the entry
+              //decompressed if it does not have it already in the event.
+              // TODO: this may have introduced a bug with clear since
+              //       writeToDisk can throw RegionClearedException which
+              //       was supposed to stop us from changing entry.
               entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-              
-              // nothing needs to be done except
-              // fixing up LRU stats
-              // @todo fixup LRU stats if needed
-              // I'm not sure anything needs to be done here.
-              // If we have overflow and it decided to evict this entry
-              // how do we handle that case when we are async?
-              // Seems like the eviction code needs to leave the value
-              // in memory until the pendingAsync is done.
+              // newValue is prepared and compressed. We can't write compressed values to disk.
+              writeToDisk(entry, region, false, event);
             } else {
+              writeBytesToDisk(entry, region, false, createValueWrapper(newValue, event));
+              entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+            }
+            
+          } else {
+            //If we have concurrency checks enabled for a persistent region, we need
+            //to add an entry to the async queue for every update to maintain the RVV
+            boolean maintainRVV = region.concurrencyChecksEnabled;
+            
+            if (!did.isPendingAsync() || maintainRVV) {
               //if the entry is not async, we need to schedule it
               //for regions with concurrency checks enabled, we add an entry
               //to the queue for every entry.
-              scheduleAsync = true;
               did.setPendingAsync(true);
+              VersionTag tag = null;
               VersionStamp stamp = entry.getVersionStamp();
               if(stamp != null) {
                 tag = stamp.asVersionTag();
               }
-              entry.setValueWithContext(region, newValue); 
+              result = new AsyncDiskEntry(region, entry, tag);
             }
-          } else if (did != null) {
             entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
-            
-            // Mark the id as needing to be written
-            // The disk remove that this section used to do caused bug 30961
-            // @todo this seems wrong. How does leaving it on disk fix the bug?
-            did.markForWriting();
-            //did.setValueSerializedSize(0);
-          }else {
-            entry.setValueWithContext(region, newValue);
           }
+        } else if (did != null) {
+          entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
           
-          if (Token.isRemovedFromDisk(oldValue)) {
-            // Note we now initialize entries removed and then set their
-            // value once we find no existing entry.
-            // So this is the normal path for a brand new entry.
-            dr.incNumEntriesInVM(1L);
-            incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0);
+          // Mark the id as needing to be written
+          // The disk remove that this section used to do caused bug 30961
+          // @todo this seems wrong. How does leaving it on disk fix the bug?
+          did.markForWriting();
+          //did.setValueSerializedSize(0);
+        }else {
+          entry.setValueWithContext(region, newValue);
+        }
+        
+        if (Token.isInvalidOrRemoved(newValue)) {
+          if (oldValue == null) {
+            updateStats(dr, region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+          } else if (!Token.isInvalidOrRemoved(oldValue)){
+            updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
+          } else {
+            // oldValue was also a token which is neither in vm or on disk.
           }
-          
-          if(newValue == Token.TOMBSTONE) {
-            if (oldValue == null) {
-              dr.incNumOverflowOnDisk(-1L);
-              dr.incNumOverflowBytesOnDisk(-oldValueLength);
-              incrementBucketStats(region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
-            } else {
-              dr.incNumEntriesInVM(-1L);
-              incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
-            }
+        } else { // we have a value to put in the vm
+          if (oldValue == null) {
+            updateStats(dr, region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+          } else if (Token.isInvalidOrRemoved(oldValue)) {
+            updateStats(dr, region, 1/*InVM*/, 0/*OnDisk*/, 0/*overflowBytesOnDisk*/);
           } else {
-            if (oldValue == null) {
-              dr.incNumEntriesInVM(1L);
-              dr.incNumOverflowOnDisk(-1L);
-              dr.incNumOverflowBytesOnDisk(-oldValueLength);
-              incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -oldValueLength);
-            } else if(oldValue == Token.TOMBSTONE) {
-              dr.incNumEntriesInVM(1L);
-              incrementBucketStats(region, 1/*InVM*/, 0/*OnDisk*/, 0/*overflowBytesOnDisk*/);
-            }
+            // old value was also in vm so no change
           }
         }
-        if (entry instanceof LRUEntry) {
-          LRUEntry le = (LRUEntry)entry;
-          le.unsetEvicted();          
-        }
-
-      }
-      } finally {
-        if (syncObj == did) {
-          dr.releaseReadLock();
-        }
       }
-      if (callRemoveFromDisk) {
-        removeFromDisk(entry, region, false, oldValue == null, false);
-      } else if (scheduleAsync && did.isPendingAsync()) {
-        // this needs to be done outside the above sync
-        scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag));
+      if (entry instanceof LRUEntry) {
+        LRUEntry le = (LRUEntry)entry;
+        le.unsetEvicted();          
       }
+      return result;
     }
 
     private static int getValueLength(DiskId did) {
@@ -1054,7 +1045,8 @@ public interface DiskEntry extends RegionEntry {
       }
       DiskId did = entry.getDiskId();
       synchronized (did) {
-        boolean oldValueWasNull = entry.isValueNull();
+        Object oldValueAsToken = entry.getValueAsToken();
+        boolean oldValueWasNull = oldValueAsToken == null;
         int oldValueLength = did.getValueLength();
         // Now that oplog creates are immediately put in cache
         // a later oplog modify will get us here
@@ -1071,24 +1063,36 @@ public interface DiskEntry extends RegionEntry {
         if (newValue.getRecoveredKeyId() >= 0) {
           entry.setValueWithContext(context, entry.prepareValueForCache(drv, newValue.getValue(), 
               false));
+          int inVM = 1;
+          if (Token.isInvalidOrRemoved(newValue.getValue())) { // but tokens never in vm
+            inVM = 0;
+          }
+          if(oldValueWasNull) { // the entry's old value is on disk
+            updateStats(drv, null, inVM, -1/*OnDisk*/, -oldValueLength);
+          } else { // the entry's old value was in the vm
+            if (inVM == 1 && Token.isInvalidOrRemoved(oldValueAsToken)) {
+              // the old state was not in vm and not on disk. But now we are in vm.
+              updateStats(drv, null, 1, 0, 0);
+            } else if (inVM == 0 && !Token.isInvalidOrRemoved(oldValueAsToken)) {
+              // the old state was in vm and not on disk. But now we are not in vm.
+              updateStats(drv, null, -1, 0, 0);
+            }
+          }
         } else {
           if (!oldValueWasNull) {
             entry.handleValueOverflow(context);
             entry.setValueWithContext(context,null); // fixes bug 41119
           }
-        }
-        if (entry instanceof LRUEntry) {
-          LRUEntry le = (LRUEntry)entry;
-          assert !le.testEvicted();
-          // we don't allow eviction during recovery
-          if (oldValueWasNull) {
-            // Note we do not append this entry because that will be
-            // done by lruEntryUpdate
-            drv.incNumEntriesInVM(1L);
-            drv.incNumOverflowOnDisk(-1L);
-            drv.incNumOverflowBytesOnDisk(-oldValueLength);
-            //No need to call incrementBucketStats here because we don't have
-            //a real bucket region, this is during recovery from disk.
+          if (!oldValueWasNull) { // the entry's old value is in vm
+            int inVM = -1;
+            if (Token.isInvalidOrRemoved(oldValueAsToken)) { // but tokens are never in vm
+              inVM = 0;
+            }
+            updateStats(drv, null, inVM, 1/*OnDisk*/, did.getValueLength());
+          } else { // the entry's old value is also on disk
+            int valueLenDelta = -oldValueLength; // but it is no longer
+            valueLenDelta += did.getValueLength(); // new one is now on disk
+            updateStats(drv, null, 0, 0, valueLenDelta);
           }
         }
       }
@@ -1356,10 +1360,9 @@ public interface DiskEntry extends RegionEntry {
       // a regression with it. Reenable this post 6.5
       //Assert.assertTrue(entry._getValue() == null);
       entry.setValueWithContext((RegionEntryContext) region, preparedValue);
-      dr.incNumEntriesInVM(1L);
-      dr.incNumOverflowOnDisk(-1L);
-      dr.incNumOverflowBytesOnDisk(-bytesOnDisk);
-      incrementBucketStats(region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk);
+      if (!Token.isInvalidOrRemoved(value)) {
+        updateStats(dr, region, 1/*InVM*/, -1/*OnDisk*/, -bytesOnDisk);
+      }
       return preparedValue;
     }
 
@@ -1382,17 +1385,31 @@ public interface DiskEntry extends RegionEntry {
       return valueBytes;
     }
 
-    public static void incrementBucketStats(Object owner,
-                                             int entriesInVmDelta,
-                                             int overflowOnDiskDelta,
-                                             int overflowBytesOnDiskDelta) {
-      if (owner instanceof BucketRegion) {
-        ((BucketRegion)owner).incNumEntriesInVM(entriesInVmDelta);
-        ((BucketRegion)owner).incNumOverflowOnDisk(overflowOnDiskDelta);
-        ((BucketRegion)owner).incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta);
-      } else if (owner instanceof DiskRegionView) {
-        ((DiskRegionView)owner).incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta);
+    public static void updateStats(DiskRegionView drv, Object owner,
+        int entriesInVmDelta,
+        int overflowOnDiskDelta,
+        int overflowBytesOnDiskDelta) {
+      if (entriesInVmDelta != 0) {
+        drv.incNumEntriesInVM(entriesInVmDelta);
+      }
+      if (overflowOnDiskDelta != 0) {
+        drv.incNumOverflowOnDisk(overflowOnDiskDelta);
+      }
+      if (overflowBytesOnDiskDelta != 0) {
+        drv.incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta);
       }
+      if (owner instanceof BucketRegion) {
+        BucketRegion br = (BucketRegion) owner;
+        br.incNumEntriesInVM(entriesInVmDelta);
+        br.incNumOverflowOnDisk(overflowOnDiskDelta);
+        br.incNumOverflowBytesOnDisk(overflowBytesOnDiskDelta);
+      }
+      // Note: we used to call owner.incNumOverflowBytesOnDisk()
+      // if owner was a DiskRegionView.
+      // But since we also call drv.incNumOverflowBytesOnDisk()
+      // and since drv is == owner when owner is not a LocalRegion
+      // (see PlaceHolderDiskRegion.getDiskRegionView())
+      // this resulted in incNumOverflowBytesOnDisk being called twice.
     }
 
     /**
@@ -1408,14 +1425,6 @@ public interface DiskEntry extends RegionEntry {
      * @throws RegionClearedException
      */
     public static int overflowToDisk(DiskEntry entry, LocalRegion region, EnableLRU ccHelper) throws RegionClearedException {
-      {
-        Token entryVal = entry.getValueAsToken();
-        if (entryVal == null || Token.isRemovedFromDisk(entryVal)) {
-          // Note it could be removed token now because
-          // freeAllEntriesOnDisk is not able to sync on entry
-          return 0;
-        }
-      }
       DiskRegion dr = region.getDiskRegion();
       final int oldSize = region.calculateRegionEntryValueSize(entry);;
       //Asif:Get diskID . If it is null, it implies it is
@@ -1432,7 +1441,7 @@ public interface DiskEntry extends RegionEntry {
       dr.acquireReadLock();
       try {
       synchronized (did) {
-        // check for a concurrent freeAllEntriesOnDisk
+        // check for a concurrent freeAllEntriesOnDisk which syncs on DiskId but not on the entry
         if (entry.isRemovedFromDisk()) {
           return 0;
         }
@@ -1450,7 +1459,6 @@ public interface DiskEntry extends RegionEntry {
             // and now we are faulting it out
           }
         }
-        boolean movedValueToDisk = false; // added for bug 41849
         
         // If async then if it does not need to be written (because it already was)
         // then treat it like the sync case. This fixes bug 41310
@@ -1462,18 +1470,10 @@ public interface DiskEntry extends RegionEntry {
           region.updateSizeOnEvict(entry.getKey(), oldSize);
           entry.handleValueOverflow(region);
           entry.setValueWithContext(region,null);
-          movedValueToDisk = true;
           change = ((LRUClockNode)entry).updateEntrySize(ccHelper);
-        }
-        int valueLength = 0;
-        if (movedValueToDisk) {
-          valueLength = getValueLength(did);
-        }
-        if(dr.isSync() || movedValueToDisk) {
-          dr.incNumEntriesInVM(-1L);
-          dr.incNumOverflowOnDisk(1L);
-          dr.incNumOverflowBytesOnDisk(valueLength);
-          incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/, valueLength);
+          // the caller checked to make sure we had something to overflow
+          // so dec inVM and inc onDisk
+          updateStats(dr, region, -1/*InVM*/, 1/*OnDisk*/, getValueLength(did));
         }
       }
       } finally {
@@ -1552,10 +1552,6 @@ public interface DiskEntry extends RegionEntry {
             try {
               if (Token.isRemovedFromDisk(entryVal)) {
                 if (region.isThisRegionBeingClosedOrDestroyed()) return;
-                // onDisk was already deced so just do the valueLength here
-                dr.incNumOverflowBytesOnDisk(-did.getValueLength());
-                incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
-                                     -did.getValueLength());
                 dr.remove(region, entry, true, false);
                 if (dr.isBackup()) {
                   did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
@@ -1573,11 +1569,7 @@ public interface DiskEntry extends RegionEntry {
                     && ((LRUEntry)entry).testEvicted()) {
                   // Moved this here to fix bug 40116.
                   region.updateSizeOnEvict(entry.getKey(), entryValSize);
-                  dr.incNumEntriesInVM(-1);
-                  dr.incNumOverflowOnDisk(1L);
-                  dr.incNumOverflowBytesOnDisk(did.getValueLength());
-                  incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/,
-                                       did.getValueLength());
+                  updateStats(dr, region, -1/*InVM*/, 1/*OnDisk*/, did.getValueLength());
                   entry.handleValueOverflow(region);
                   entry.setValueWithContext(region,null);
                 }
@@ -1628,90 +1620,79 @@ public interface DiskEntry extends RegionEntry {
      * @see DiskRegion#remove
      */
     public static void removeFromDisk(DiskEntry entry, LocalRegion region, boolean isClear) throws RegionClearedException {
-      removeFromDisk(entry, region, true, false, isClear);
-    }
-    private static void removeFromDisk(DiskEntry entry, LocalRegion region,
-                                      boolean checkValue, boolean valueWasNull, boolean isClear) throws RegionClearedException {
       DiskRegion dr = region.getDiskRegion();
-      
-      //If we have concurrency checks enabled for a persistent region, we need
-      //to add an entry to the async queue for every update to maintain the RVV
-      boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
-      
       DiskId did = entry.getDiskId();
-      VersionTag tag = null;
       Object syncObj = did;
       if (did == null) {
         syncObj = entry;
       }
-      boolean scheduledAsyncHere = false;
+      AsyncDiskEntry asyncDiskEntry = null;
       if (syncObj == did) {
         dr.acquireReadLock();
       }
       try {
-      synchronized (syncObj) { 
-
-        if (did == null || (dr.isBackup() && did.getKeyId()== DiskRegion.INVALID_ID)) {
-          // Not on disk yet
-          dr.incNumEntriesInVM(-1L);
-          incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
-          dr.unscheduleAsyncWrite(did);
-          return;
-        } 
-        //Asif: This will convert the -ve OplogKeyId to positive as part of fixing
-        //Bug # 39989
-        did.unmarkForWriting();
-
-        //System.out.println("DEBUG: removeFromDisk doing remove(" + id + ")");
-        int oldValueLength = 0;
-        if (dr.isSync() || isClear) {
-          oldValueLength = did.getValueLength();
-          dr.remove(region, entry, false, isClear);
-          if (dr.isBackup()) {
-            did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
-          }
-          //If this is a clear, we should unschedule the async write for this
-          //entry
-          did.setPendingAsync(false);
-        } else {
-          if (!did.isPendingAsync() || maintainRVV) {
-            scheduledAsyncHere = true;
-            did.setPendingAsync(true);
-            VersionStamp stamp = entry.getVersionStamp();
-            if(stamp != null) {
-              tag = stamp.asVersionTag();
-            }
-          }
-        }
-        if (checkValue) {
-          valueWasNull = entry.isValueNull();
-          entry._removePhase1();
-        }
-        if (valueWasNull) {
-          dr.incNumOverflowOnDisk(-1L);
-          dr.incNumOverflowBytesOnDisk(-oldValueLength);
-          incrementBucketStats(region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
-        }
-        else {
-          dr.incNumEntriesInVM(-1L);
-          incrementBucketStats(region, -1/*InVM*/, 0/*OnDisk*/, 0);
-          if (!dr.isSync()) {
-            // we are going to do an async remove of an entry that is not currently
-            // overflowed to disk so we don't want to count its value length as being
-            // on disk when we finally do the async op. So we clear it here.
-            did.setValueLength(0);
-          }
+        synchronized (syncObj) {
+          asyncDiskEntry = basicRemoveFromDisk(entry, region, isClear);
         }
-      }
       } finally {
         if (syncObj == did) {
           dr.releaseReadLock();
         }
       }
-      if (scheduledAsyncHere && did.isPendingAsync()) {
+      if (asyncDiskEntry != null && did.isPendingAsync()) {
         // do this outside the sync
-        scheduleAsyncWrite(new AsyncDiskEntry(region, entry, tag));
+        scheduleAsyncWrite(asyncDiskEntry);
+      }
+    }
+
+    private static AsyncDiskEntry basicRemoveFromDisk(DiskEntry entry, LocalRegion region, boolean isClear) throws RegionClearedException {
+      final DiskRegion dr = region.getDiskRegion();
+      final DiskId did = entry.getDiskId();
+      final Object curValAsToken = entry.getValueAsToken();
+      if (did == null || (dr.isBackup() && did.getKeyId()== DiskRegion.INVALID_ID)) {
+        // Not on disk yet
+        if (!Token.isInvalidOrRemoved(curValAsToken)) {
+          updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
+        }
+        dr.unscheduleAsyncWrite(did);
+        return null;
       }
+      AsyncDiskEntry result = null;
+      
+      //Asif: This will convert the -ve OplogKeyId to positive as part of fixing
+      //Bug # 39989
+      did.unmarkForWriting();
+
+      //System.out.println("DEBUG: removeFromDisk doing remove(" + id + ")");
+      int oldValueLength = did.getValueLength();
+      if (dr.isSync() || isClear) {
+        dr.remove(region, entry, false, isClear);
+        if (dr.isBackup()) {
+          did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
+        }
+        //If this is a clear, we should unschedule the async write for this
+        //entry
+        did.setPendingAsync(false);
+      } else {
+        //If we have concurrency checks enabled for a persistent region, we need
+        //to add an entry to the async queue for every update to maintain the RVV
+        boolean maintainRVV = region.concurrencyChecksEnabled && dr.isBackup();
+        if (!did.isPendingAsync() || maintainRVV) {
+          did.setPendingAsync(true);
+          VersionTag tag = null;
+          VersionStamp stamp = entry.getVersionStamp();
+          if(stamp != null) {
+            tag = stamp.asVersionTag();
+          }
+          result = new AsyncDiskEntry(region, entry, tag);
+        }
+      }
+      if (curValAsToken == null) {
+        updateStats(dr, region, 0/*InVM*/, -1/*OnDisk*/, -oldValueLength);
+      } else if (!Token.isInvalidOrRemoved(curValAsToken)) {
+        updateStats(dr, region, -1/*InVM*/, 0/*OnDisk*/, 0);
+      }
+      return result;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed85e8eb/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
index 46ccd47..73dc5ab 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/LocalRegion.java
@@ -12741,4 +12741,24 @@ public class LocalRegion extends AbstractRegion
     
   }
   
+  public int testHookGetValuesInVM() {
+    int result = 0;
+    for (RegionEntry re: getRegionMap().regionEntries()) {
+      if (re.getValueAsToken() == Token.NOT_A_TOKEN) {
+        result++;
+      }
+    }
+    return result;
+  }
+  
+  public int testHookGetValuesOnDisk() {
+    int result = 0;
+    for (RegionEntry re: getRegionMap().regionEntries()) {
+      if (re.getValueAsToken() == null) {
+        result++;
+      }
+    }
+    return result;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed85e8eb/geode-core/src/test/java/com/gemstone/gemfire/cache30/DiskRegionDUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DiskRegionDUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DiskRegionDUnitTest.java
index a43950e..e89e16d 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/cache30/DiskRegionDUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/cache30/DiskRegionDUnitTest.java
@@ -1160,33 +1160,40 @@ public class DiskRegionDUnitTest extends JUnit4CacheTestCase {
     assertEquals(total - 1, diskStats.getNumEntriesInVM());
     assertEquals(5, diskStats.getNumOverflowOnDisk());
 
-    // Make sure invalidate doesn't change anything
+    // Make sure invalidate of inVM entry changes inVM count but not disk
     region.invalidate(new Integer(total+10));
-    assertEquals(region.entryCount(), diskStats.getNumEntriesInVM() +
+    assertEquals(region.entryCount()-1, diskStats.getNumEntriesInVM() +
                  diskStats.getNumOverflowOnDisk());
-    assertEquals(total - 1, diskStats.getNumEntriesInVM());
+    assertEquals(total - 2, diskStats.getNumEntriesInVM());
     assertEquals(5, diskStats.getNumOverflowOnDisk());
 
-    // Make sure local-invalidate doesn't change anything
+    // Make sure local-invalidate of inVM entry changes inVM count but not disk
     region.localInvalidate(new Integer(total+11));
-    assertEquals(region.entryCount(), diskStats.getNumEntriesInVM() +
+    assertEquals(region.entryCount()-2, diskStats.getNumEntriesInVM() +
                  diskStats.getNumOverflowOnDisk());
-    assertEquals(total - 1, diskStats.getNumEntriesInVM());
+    assertEquals(total - 3, diskStats.getNumEntriesInVM());
     assertEquals(5, diskStats.getNumOverflowOnDisk());
 
-    // Make sure destroy does
+    // Make sure destroy of invalid entry does not change inVM or onDisk but changes entry count
     region.destroy(new Integer(total+10));
-    ((LocalRegion)region).dumpBackingMap();
-    assertEquals(region.entryCount(), diskStats.getNumEntriesInVM() +
+    //((LocalRegion)region).dumpBackingMap();
+    assertEquals(region.entryCount()-1, diskStats.getNumEntriesInVM() +
                  diskStats.getNumOverflowOnDisk());
-    assertEquals(total - 2, diskStats.getNumEntriesInVM());
+    assertEquals(total - 3, diskStats.getNumEntriesInVM());
+    assertEquals(5, diskStats.getNumOverflowOnDisk());
+
+    // Make sure destroy of inVM entry does change inVM but not onDisk
+    region.destroy(new Integer(total+12));
+    assertEquals(region.entryCount()-1, diskStats.getNumEntriesInVM() +
+                 diskStats.getNumOverflowOnDisk());
+    assertEquals(total - 4, diskStats.getNumEntriesInVM());
     assertEquals(5, diskStats.getNumOverflowOnDisk());
 
     // Destroy an entry that has been overflowed
     region.destroy(new Integer(3));
-    assertEquals(region.entryCount(), diskStats.getNumEntriesInVM() +
+    assertEquals(region.entryCount()-1, diskStats.getNumEntriesInVM() +
                  diskStats.getNumOverflowOnDisk());
-    assertEquals(total - 2, diskStats.getNumEntriesInVM());
+    assertEquals(total - 4, diskStats.getNumEntriesInVM());
     assertEquals(4, diskStats.getNumOverflowOnDisk());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/ed85e8eb/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/DiskRegRecoveryJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/DiskRegRecoveryJUnitTest.java b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/DiskRegRecoveryJUnitTest.java
index 792f07e..da20763 100644
--- a/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/DiskRegRecoveryJUnitTest.java
+++ b/geode-core/src/test/java/com/gemstone/gemfire/internal/cache/DiskRegRecoveryJUnitTest.java
@@ -26,7 +26,9 @@ import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
 
+import com.jayway.awaitility.Awaitility;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -1368,7 +1370,7 @@ public class DiskRegRecoveryJUnitTest extends DiskRegionTestingBase {
 
       DiskRegion dr = ((LocalRegion)region).getDiskRegion();
       if (recovValues) {
-        assertEquals(1, dr.getNumEntriesInVM());
+        waitForInVMToBe(dr, 1);
         assertEquals(0, dr.getNumOverflowOnDisk());
       } else {
         assertEquals(0, dr.getNumEntriesInVM());
@@ -1384,13 +1386,10 @@ public class DiskRegRecoveryJUnitTest extends DiskRegionTestingBase {
       region.close();
       region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
       dr = ((LocalRegion)region).getDiskRegion();
-      if (recovValues) {
-        assertEquals(1, dr.getNumEntriesInVM());
-        assertEquals(0, dr.getNumOverflowOnDisk());
-      } else {
-        assertEquals(1, dr.getNumEntriesInVM());
-        assertEquals(0, dr.getNumOverflowOnDisk());
-      }
+      assertEquals(1, region.size());
+      // invalid entries are not inVM since they have no value
+      assertEquals(0, dr.getNumEntriesInVM());
+      assertEquals(0, dr.getNumOverflowOnDisk());
 
       region.clear();
       assertEquals(0, dr.getNumEntriesInVM());
@@ -1402,7 +1401,7 @@ public class DiskRegRecoveryJUnitTest extends DiskRegionTestingBase {
       region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
       dr = ((LocalRegion)region).getDiskRegion();
       if (recovValues) {
-        assertEquals(1, dr.getNumEntriesInVM());
+        waitForInVMToBe(dr, 1);
         assertEquals(0, dr.getNumOverflowOnDisk());
       } else {
         assertEquals(0, dr.getNumEntriesInVM());
@@ -1418,13 +1417,9 @@ public class DiskRegRecoveryJUnitTest extends DiskRegionTestingBase {
       region.close();
       region = DiskRegionHelperFactory.getSyncPersistOnlyRegion(cache, diskProps, Scope.LOCAL);
       dr = ((LocalRegion)region).getDiskRegion();
-      if (recovValues) {
-        assertEquals(1, dr.getNumEntriesInVM());
-        assertEquals(0, dr.getNumOverflowOnDisk());
-      } else {
-        assertEquals(1, dr.getNumEntriesInVM());
-        assertEquals(0, dr.getNumOverflowOnDisk());
-      }
+      // invalid entries have not value so the are not inVM or onDisk
+      assertEquals(0, dr.getNumEntriesInVM());
+      assertEquals(0, dr.getNumOverflowOnDisk());
 
     } finally {
       if (oldValue != null) {
@@ -1435,6 +1430,12 @@ public class DiskRegRecoveryJUnitTest extends DiskRegionTestingBase {
     }
   }
 
+  private void waitForInVMToBe(final DiskRegion dr, final int expected) {
+    // values are recovered async from disk
+    Awaitility.await().pollInterval(10, TimeUnit.MILLISECONDS).pollDelay(10, TimeUnit.MILLISECONDS)
+    .atMost(30, TimeUnit.SECONDS).until(() -> assertEquals(expected, dr.getNumEntriesInVM())); 
+  }
+
   @Test
   public void testVerifyStatsWithValues() {
     basicVerifyStats(true);


Mime
View raw message