geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dschnei...@apache.org
Subject incubator-geode git commit: fix stat corrupting when async queue full
Date Tue, 26 Jul 2016 22:06:31 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-1700 [created] 193cd9036


fix stat corrupting when async queue full

A common method is now used for writing async ops to disk.
The same method is now used for both flushing an item that
has been taken out of the queue and for when the queue is
full causing a sync write to be done.

Also did a minor refactoring to get rid of the "removed" flag.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/193cd903
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/193cd903
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/193cd903

Branch: refs/heads/feature/GEODE-1700
Commit: 193cd9036aa88d6e0d9e2532e57fd736de2e48e0
Parents: b3b2c89
Author: Darrel Schneider <dschneider@pivotal.io>
Authored: Tue Jul 26 15:03:06 2016 -0700
Committer: Darrel Schneider <dschneider@pivotal.io>
Committed: Tue Jul 26 15:03:06 2016 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/DiskEntry.java       | 140 +++++--------------
 1 file changed, 35 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/193cd903/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
index 698e3bd..36c2361 100644
--- a/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
+++ b/geode-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
@@ -1496,82 +1496,7 @@ public interface DiskEntry extends RegionEntry {
 
     
     public static void handleFullAsyncQueue(DiskEntry entry, LocalRegion region, VersionTag
tag) {
-      DiskRegion dr = region.getDiskRegion();
-      DiskId did = entry.getDiskId();
-      synchronized (entry) {
-      dr.acquireReadLock();
-      try {
-        synchronized (did) {
-          if (did.isPendingAsync()) {
-            did.setPendingAsync(false);
-            final Token entryVal = entry.getValueAsToken();
-            final int entryValSize = region.calculateRegionEntryValueSize(entry);
-            boolean remove = false;
-            try {
-              if (Token.isRemovedFromDisk(entryVal)) {
-                // onDisk was already deced so just do the valueLength here
-                dr.incNumOverflowBytesOnDisk(-did.getValueLength());
-                incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
-                                     -did.getValueLength());
-                dr.remove(region, entry, true, false);
-                if (dr.isBackup()) {
-                  did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
-                }
-                remove = true;
-              } else if (Token.isInvalid(entryVal) && !dr.isBackup()) {
-                // no need to write invalid to disk if overflow only
-              } else if (entryVal != null) {
-                writeToDisk(entry, region, true);
-              } else {
-                //if we have a version tag we need to record the operation
-                //to update the RVV
-                if(tag != null) {
-                  DiskEntry.Helper.doAsyncFlush(tag, region);
-                }
-                return;
-              }
-              assert !dr.isSync();
-              // Only setValue to null if this was an evict.
-              // We could just be a backup that is writing async.
-              if (!remove
-                  && !Token.isInvalid(entryVal)
-                  && entry instanceof LRUEntry
-                  && ((LRUEntry)entry).testEvicted()) {
-                // Moved this here to fix bug 40116.
-                region.updateSizeOnEvict(entry.getKey(), entryValSize);
-                // note the old size was already accounted for
-                // onDisk was already inced so just do the valueLength here
-                dr.incNumOverflowBytesOnDisk(did.getValueLength());
-                incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
-                                     did.getValueLength());
-                entry.handleValueOverflow(region);
-                entry.setValueWithContext(region,null);
-              }
-              
-              //See if we the entry we wrote to disk has the same tag
-              //as this entry. If not, write the tag as a conflicting operation.
-              //to update the RVV.
-              VersionStamp stamp = entry.getVersionStamp();
-              if(tag != null && stamp != null 
-                  && (stamp.getMemberID() != tag.getMemberID()
-                    || stamp.getRegionVersion() != tag.getRegionVersion())) {
-                DiskEntry.Helper.doAsyncFlush(tag, region);
-              }
-            } catch (RegionClearedException ignore) {
-              // no need to do the op since it was clobbered by a region clear
-            }
-          } else {
-            //if we have a version tag we need to record the operation
-            //to update the RVV, even if we don't write the entry
-            if(tag != null) {
-              DiskEntry.Helper.doAsyncFlush(tag, region);
-            }
-          }
-        }
-      } finally {
-        dr.releaseReadLock();
-      }
-      } // sync entry
+      writeEntryToDisk(entry, region, tag, true);
     }
     
     public static void doAsyncFlush(VersionTag tag, LocalRegion region) {
@@ -1592,12 +1517,25 @@ public interface DiskEntry extends RegionEntry {
     /**
      * Flush an entry that was previously scheduled to be written to disk.
      * @param tag 
+     * @param asyncQueueWasFull true if caller wanted to put this entry in the queue
+     *        but could not do so because it was full
      * @since GemFire prPersistSprint1
      */
     public static void doAsyncFlush(DiskEntry entry, LocalRegion region, VersionTag tag)
{
+      writeEntryToDisk(entry, region, tag, false);
+    }
+    /**
+     * Does a synchronous write to disk for a region that uses async.
+     * This method is used by both doAsyncFlush and handleFullAsyncQueue to fix GEODE-1700.
+     * @param asyncQueueWasFull true if caller wanted to put this entry in the queue
+     *        but could not do so because it was full
+     */
+    private static void writeEntryToDisk(DiskEntry entry, LocalRegion region, VersionTag
tag, boolean asyncQueueWasFull) {
       if (region.isThisRegionBeingClosedOrDestroyed()) return;
       DiskRegion dr = region.getDiskRegion();
-      dr.setClearCountReference();
+      if (!asyncQueueWasFull) {
+        dr.setClearCountReference();
+      }
       synchronized (entry) { // fixes 40116
         // If I don't sync the entry and this method ends up doing an eviction
         // thus setting value to null
@@ -1614,7 +1552,6 @@ public interface DiskEntry extends RegionEntry {
             did.setPendingAsync(false);
             final Token entryVal = entry.getValueAsToken();
             final int entryValSize = region.calculateRegionEntryValueSize(entry);
-            boolean remove = false;
             try {
               if (Token.isRemovedFromDisk(entryVal)) {
                 if (region.isThisRegionBeingClosedOrDestroyed()) return;
@@ -1626,19 +1563,28 @@ public interface DiskEntry extends RegionEntry {
                 if (dr.isBackup()) {
                   did.setKeyId(DiskRegion.INVALID_ID); // fix for bug 41340
                 }
-                remove = true;
               } else if ((Token.isInvalid(entryVal) || entryVal == Token.TOMBSTONE) &&
!dr.isBackup()) {
                 // no need to write invalid or tombstones to disk if overflow only
               } else if (entryVal != null) {
                 writeToDisk(entry, region, true);
+                assert !dr.isSync();
+                // Only setValue to null if this was an evict.
+                // We could just be a backup that is writing async.
+                if (!Token.isInvalid(entryVal)
+                    && (entryVal != Token.TOMBSTONE)
+                    && entry instanceof LRUEntry
+                    && ((LRUEntry)entry).testEvicted()) {
+                  // Moved this here to fix bug 40116.
+                  region.updateSizeOnEvict(entry.getKey(), entryValSize);
+                  dr.incNumEntriesInVM(-1);
+                  dr.incNumOverflowOnDisk(1L);
+                  dr.incNumOverflowBytesOnDisk(did.getValueLength());
+                  incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/,
+                                       did.getValueLength());
+                  entry.handleValueOverflow(region);
+                  entry.setValueWithContext(region,null);
+                }
               } else {
-                // @todo why would we have a null value here?
-                // I'm seeing it show up in tests:
-// java.lang.IllegalArgumentException: Must not serialize  null  in this context.
-// 	at com.gemstone.gemfire.internal.cache.EntryEventImpl.serialize(EntryEventImpl.java:1024)
-// 	at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.writeToDisk(DiskEntry.java:351)
-// 	at com.gemstone.gemfire.internal.cache.DiskEntry$Helper.doAsyncFlush(DiskEntry.java:683)
-// 	at com.gemstone.gemfire.internal.cache.DiskRegion$FlusherThread.run(DiskRegion.java:1055)
                 //if we have a version tag we need to record the operation
                 //to update the RVV
                 if(tag != null) {
@@ -1646,24 +1592,6 @@ public interface DiskEntry extends RegionEntry {
                 }
                 return;
               }
-              assert !dr.isSync();
-              // Only setValue to null if this was an evict.
-              // We could just be a backup that is writing async.
-              if (!remove
-                  && !Token.isInvalid(entryVal)
-                  && (entryVal != Token.TOMBSTONE)
-                  && entry instanceof LRUEntry
-                  && ((LRUEntry)entry).testEvicted()) {
-                // Moved this here to fix bug 40116.
-                region.updateSizeOnEvict(entry.getKey(), entryValSize);
-                dr.incNumEntriesInVM(-1);
-                dr.incNumOverflowOnDisk(1L);
-                dr.incNumOverflowBytesOnDisk(did.getValueLength());
-                incrementBucketStats(region, -1/*InVM*/, 1/*OnDisk*/,
-                                     did.getValueLength());
-                entry.handleValueOverflow(region);
-                entry.setValueWithContext(region,null);
-              }
             } catch (RegionClearedException ignore) {
               // no need to do the op since it was clobbered by a region clear
             }
@@ -1689,7 +1617,9 @@ public interface DiskEntry extends RegionEntry {
         dr.releaseReadLock();
       }
       } finally {
-        dr.removeClearCountReference();
+        if (!asyncQueueWasFull) {
+          dr.removeClearCountReference();
+        }
       }
       } // sync entry
     }


Mime
View raw message