geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From upthewatersp...@apache.org
Subject incubator-geode git commit: GEODE-56 Decrementing bytesOverflowedOnDisk when an update happens to an evicted entry.
Date Sat, 11 Jul 2015 00:47:52 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-56 [created] 9dff599b9


GEODE-56 Decrementing bytesOverflowedOnDisk when an update happens to an evicted entry.

For persistent regions, we were correctly updating this stat. But for
non persistent regions we did not read the old size on disk, so we were
not updating the stat.

If the entry was in the async queue, we only increment the stat once
when the final value is actually written to disk. So we will only
decrement the stat if the oldValue is not pending asynchronous write to
disk.

I'm still seeing issues with the entryCount. I've tracked this down to
what I think is a problem with tombstones. It appears when we modify an
entry that is a tombstone, we increment the entry count once for the new
value, and once for removing the tombstone. But I'm not sure if that is
the full story.


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/9dff599b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/9dff599b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/9dff599b

Branch: refs/heads/feature/GEODE-56
Commit: 9dff599b9d1e0c04fed60e54a765ad2ef84e77d8
Parents: 7cf940f
Author: Dan Smith <dsmith@pivotal.io>
Authored: Tue Jun 9 18:16:35 2015 -0700
Committer: Dan Smith <dsmith@pivotal.io>
Committed: Fri Jul 10 17:25:49 2015 -0700

----------------------------------------------------------------------
 .../gemfire/internal/cache/BucketRegion.java    |  10 +
 .../gemfire/internal/cache/DiskEntry.java       |  14 +-
 .../cache/PartitionedRegionStatsJUnitTest.java  | 290 ++++++++++++++++++-
 .../cache/PartitionedRegionTestHelper.java      |   3 +-
 .../control/RebalanceOperationDUnitTest.java    | 228 ++++++++++++++-
 5 files changed, 536 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9dff599b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
index b46d897..e9e1523 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketRegion.java
@@ -2247,6 +2247,16 @@ implements Bucket
     result += getNumOverflowBytesOnDisk();
     return result;
   }
+  
+  public long getBytesInMemory() {
+    long result = this.bytesInMemory.get();
+    if(result == BUCKET_DESTROYED) {
+      return 0;
+    }
+    
+    return result;
+  }
+  
 
   public void preDestroyBucket(int bucketId) {
     final IndexUpdater indexUpdater = getIndexUpdater();

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9dff599b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
index b67d0f1..1e61499 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/DiskEntry.java
@@ -981,9 +981,17 @@ public interface DiskEntry extends RegionEntry {
           //disk access exception.
           
           //entry.setValueWithContext(region, newValue); // OFFHEAP newValue already prepared
+          
+          if(did != null && did.isPendingAsync()) {
+            //if the entry was not yet written to disk, we didn't update
+            //the bytes on disk.
+            oldValueLength = 0;
+          } else {
+            oldValueLength = getValueLength(did);
+          }
+          
           if (dr.isBackup()) {
             dr.testIsRecoveredAndClear(did); // fixes bug 41409
-            oldValueLength = getValueLength(did);
             if (dr.isSync()) {
               //In case of compression the value is being set first 
               // because atleast for now , GemFireXD does not support compression
@@ -1588,6 +1596,7 @@ public interface DiskEntry extends RegionEntry {
             try {
               if (Token.isRemovedFromDisk(entryVal)) {
                 // onDisk was already deced so just do the valueLength here
+                dr.incNumOverflowBytesOnDisk(-did.getValueLength());
                 incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
                                      -did.getValueLength());
                 dr.remove(region, entry, true, false);
@@ -1618,6 +1627,7 @@ public interface DiskEntry extends RegionEntry {
                 region.updateSizeOnEvict(entry.getKey(), entryValSize);
                 // note the old size was already accounted for
                 // onDisk was already inced so just do the valueLength here
+                dr.incNumOverflowBytesOnDisk(did.getValueLength());
                 incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
                                      did.getValueLength());
                 try {
@@ -1699,6 +1709,7 @@ public interface DiskEntry extends RegionEntry {
               if (Token.isRemovedFromDisk(entryVal)) {
                 if (region.isThisRegionBeingClosedOrDestroyed()) return;
                 // onDisk was already deced so just do the valueLength here
+                dr.incNumOverflowBytesOnDisk(-did.getValueLength());
                 incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
                                      -did.getValueLength());
                 dr.remove(region, entry, true, false);
@@ -1737,6 +1748,7 @@ public interface DiskEntry extends RegionEntry {
                 region.updateSizeOnEvict(entry.getKey(), entryValSize);
                 // note the old size was already accounted for
                 // onDisk was already inced so just do the valueLength here
+                dr.incNumOverflowBytesOnDisk(did.getValueLength());
                 incrementBucketStats(region, 0/*InVM*/, 0/*OnDisk*/,
                                      did.getValueLength());
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9dff599b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionStatsJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionStatsJUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionStatsJUnitTest.java
index 3711329..bd42e6b 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionStatsJUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionStatsJUnitTest.java
@@ -11,20 +11,33 @@
  */
 package com.gemstone.gemfire.internal.cache;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.Set;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import static org.junit.Assert.*;
-
 import com.gemstone.gemfire.LogWriter;
 import com.gemstone.gemfire.Statistics;
-import com.gemstone.gemfire.cache.*;
+import com.gemstone.gemfire.cache.AttributesFactory;
+import com.gemstone.gemfire.cache.Cache;
+import com.gemstone.gemfire.cache.DataPolicy;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
+import com.gemstone.gemfire.cache.PartitionAttributesFactory;
+import com.gemstone.gemfire.cache.PartitionedRegionStorageException;
+import com.gemstone.gemfire.cache.RegionExistsException;
+import com.gemstone.gemfire.internal.FileUtil;
 import com.gemstone.gemfire.test.junit.categories.IntegrationTest;
 
-import junit.framework.TestCase;
-
 /**
  * @author tapshank, Created on Apr 13, 2006
  *  
@@ -32,12 +45,19 @@ import junit.framework.TestCase;
 @Category(IntegrationTest.class)
 public class PartitionedRegionStatsJUnitTest
 {
+  private static final File DISK_DIR = new File("PRStatsTest");
   LogWriter logger = null;
 
   @Before
   public void setUp() {
     logger = PartitionedRegionTestHelper.getLogger();
   }
+  
+  @After
+  public void tearDown() throws IOException {
+    PartitionedRegionTestHelper.closeCache();
+    FileUtil.delete(DISK_DIR);
+  }
 
   private PartitionedRegion createPR(String name, int lmax, int redundancy) {
     PartitionAttributesFactory paf = new PartitionAttributesFactory();
@@ -57,6 +77,33 @@ public class PartitionedRegionStatsJUnitTest
     }    
     return pr;
   }
+  
+  private PartitionedRegion createPRWithEviction(String name, int lmax, int redundancy, int
evictionCount, boolean diskSync, boolean persistent) {
+    PartitionAttributesFactory paf = new PartitionAttributesFactory();
+    paf
+      .setLocalMaxMemory(lmax)
+      .setRedundantCopies(redundancy)
+      .setTotalNumBuckets(13); // set low to reduce logging
+    AttributesFactory af = new AttributesFactory();
+    af.setPartitionAttributes(paf.create());
+    if(persistent) {
+      af.setDataPolicy(DataPolicy.PERSISTENT_PARTITION);
+    }
+    af.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK));
+    af.setDiskStoreName("diskstore");
+    af.setDiskSynchronous(diskSync);
+    Cache cache = PartitionedRegionTestHelper.createCache();
+    DISK_DIR.mkdir();
+    cache.createDiskStoreFactory().setDiskDirs(new File[] {DISK_DIR}).create("diskstore");
+    PartitionedRegion pr = null;
+    try {
+      pr = (PartitionedRegion)cache.createRegion(name, af.create());
+    }
+    catch (RegionExistsException rex) {
+      pr = (PartitionedRegion)cache.getRegion(name);
+    }    
+    return pr;
+  }
     
   /**
    * This test verifies that PR statistics are working properly for
@@ -79,7 +126,7 @@ public class PartitionedRegionStatsJUnitTest
           .fine("PartitionedRegionStatsJUnitTest -  testStats() Completed successfully ...
");
     }
   }
-
+  
   /**
    * This method verifies that PR statistics are working properly for a
    * PartitionedRegion. putsCompleted, getsCompleted, createsCompleted,
@@ -214,4 +261,235 @@ public class PartitionedRegionStatsJUnitTest
      * 2); assertEquals(avgRedundantCopies, 2);
      */
   }
+  
+  public void testOverflowStatsAsync() throws Exception
+  {
+    String regionname = "testStats";
+    int localMaxMemory = 100;
+    PartitionedRegion pr = createPRWithEviction(regionname + 1, localMaxMemory, 0, 1, false,
false);
+    validateOverflowStats(pr);
+  }
+  
+  /**
+   * This test verifies that PR statistics are working properly for
+   * single/multiple PartitionedRegions on single node.
+   * 
+   * @throws Exception
+   */
+  public void testOverflowStats() throws Exception
+  {
+    String regionname = "testStats";
+    int localMaxMemory = 100;
+    PartitionedRegion pr = createPRWithEviction(regionname + 1, localMaxMemory, 0, 1, true,
false);
+    validateOverflowStats(pr);
+  }
+  
+  public void testPersistOverflowStatsAsync() throws Exception
+  {
+    String regionname = "testStats";
+    int localMaxMemory = 100;
+    PartitionedRegion pr = createPRWithEviction(regionname + 1, localMaxMemory, 0, 1, false,
true);
+    validateOverflowStats(pr);
+  }
+  
+  /**
+   * This test verifies that PR statistics are working properly for
+   * single/multiple PartitionedRegions on single node.
+   * 
+   * @throws Exception
+   */
+  public void testPersistOverflowStats() throws Exception
+  {
+    String regionname = "testStats";
+    int localMaxMemory = 100;
+    PartitionedRegion pr = createPRWithEviction(regionname + 1, localMaxMemory, 0, 1, true,
true);
+    validateOverflowStats(pr);
+  }
+  
+  private void validateOverflowStats(PartitionedRegion pr) throws Exception  {
+    Statistics stats = pr.getPrStats().getStats();
+    DiskRegionStats diskStats = pr.getDiskRegionStats();
+    
+    assertEquals(0 , stats.getLong("dataStoreBytesInUse"));
+    assertEquals(0 , stats.getInt("dataStoreEntryCount"));
+    assertEquals(0 , diskStats.getNumOverflowBytesOnDisk());
+    assertEquals(0 , diskStats.getNumEntriesInVM());
+    assertEquals(0 , diskStats.getNumOverflowOnDisk());
+    assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+    assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+
+    
+    int numEntries = 0;
+    
+    pr.put(0, 0);
+    numEntries++;
+    pr.getDiskStore().flush();
+    
+    long singleEntryMemSize = stats.getLong("dataStoreBytesInUse");
+    assertEquals(1 , stats.getInt("dataStoreEntryCount"));
+    assertEquals(0 , diskStats.getNumOverflowBytesOnDisk());
+    assertEquals(1 , diskStats.getNumEntriesInVM());
+    assertEquals(0 , diskStats.getNumOverflowOnDisk());
+    assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+    assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+    
+    pr.put(1, 1);
+    numEntries++;
+    pr.getDiskStore().flush();
+    
+    assertEquals(singleEntryMemSize, stats.getLong("dataStoreBytesInUse"));
+    assertEquals(2 , stats.getInt("dataStoreEntryCount"));
+    long entryOverflowSize = diskStats.getNumOverflowBytesOnDisk();
+    assertEquals(1 , diskStats.getNumEntriesInVM());
+    assertEquals(1 , diskStats.getNumOverflowOnDisk());
+    assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+    assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+    
+    assertTrue(entryOverflowSize > 0);
+    
+    for(; numEntries < pr.getTotalNumberOfBuckets() * 5; numEntries++) {
+      pr.put(numEntries, numEntries);
+    }
+    pr.getDiskStore().flush();
+    
+    assertEquals(singleEntryMemSize, stats.getLong("dataStoreBytesInUse"));
+    assertEquals(numEntries , stats.getInt("dataStoreEntryCount"));
+    assertEquals((numEntries -1) * entryOverflowSize, diskStats.getNumOverflowBytesOnDisk());
+    assertEquals(1 , diskStats.getNumEntriesInVM());
+    assertEquals((numEntries -1) , diskStats.getNumOverflowOnDisk());
+    assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+    assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+    
+    
+    //Update some entries 
+    for(int i = 0; i < numEntries / 2; i++) {
+      pr.put(i, i*2);
+    }
+    pr.getDiskStore().flush();
+    
+    assertEquals(singleEntryMemSize, stats.getLong("dataStoreBytesInUse"));
+    assertEquals(numEntries , stats.getInt("dataStoreEntryCount"));
+    assertEquals((numEntries -1) * entryOverflowSize, diskStats.getNumOverflowBytesOnDisk());
+    assertEquals(1 , diskStats.getNumEntriesInVM());
+    assertEquals((numEntries -1) , diskStats.getNumOverflowOnDisk());
+    assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+    assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+    
+    //Get some entries to trigger evictions
+    for(int i = 0; i < numEntries / 2; i++) {
+      pr.get(i);
+    }
+    pr.getDiskStore().flush();
+    
+    assertEquals(singleEntryMemSize, stats.getLong("dataStoreBytesInUse"));
+    assertEquals(numEntries , stats.getInt("dataStoreEntryCount"));
+    assertEquals((numEntries -1) * entryOverflowSize, diskStats.getNumOverflowBytesOnDisk());
+    assertEquals(1 , diskStats.getNumEntriesInVM());
+    assertEquals((numEntries -1) , diskStats.getNumOverflowOnDisk());
+    assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+    assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+    
+    
+    //Remove some entries
+    for(; numEntries > 100; numEntries--) {
+      pr.remove(numEntries);
+    }
+    pr.getDiskStore().flush();
+    
+    assertEquals(singleEntryMemSize, stats.getLong("dataStoreBytesInUse"));
+    assertEquals(numEntries , stats.getInt("dataStoreEntryCount"));
+    assertEquals((numEntries -1) * entryOverflowSize, diskStats.getNumOverflowBytesOnDisk());
+    assertEquals(1 , diskStats.getNumEntriesInVM());
+    assertEquals((numEntries -1) , diskStats.getNumOverflowOnDisk());
+    assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+    assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+
+    //Update the same entry twice
+    pr.put(5, 5);
+    pr.put(5, 6);
+    pr.getDiskStore().flush();
+    
+    assertEquals(singleEntryMemSize, stats.getLong("dataStoreBytesInUse"));
+    assertEquals(numEntries , stats.getInt("dataStoreEntryCount"));
+    assertEquals((numEntries -1) * entryOverflowSize, diskStats.getNumOverflowBytesOnDisk());
+    assertEquals(1 , diskStats.getNumEntriesInVM());
+    assertEquals((numEntries -1) , diskStats.getNumOverflowOnDisk());
+    assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+    assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+    
+    //Do some random operations
+
+    System.out.println("----Doing random operations");
+    Random rand = new Random(12345L);
+    for(int i =0; i < 1000; i++) {
+      int key = rand.nextInt(numEntries);
+      int op = rand.nextInt(3);
+      switch(op) {
+        case 0:
+          System.out.println("put");
+          pr.put(key, rand.nextInt());
+          break;
+        case 1:
+          System.out.println("get");
+          pr.get(key);
+          break;
+        case 2:
+          System.out.println("remove");
+          pr.remove(key);
+          break;
+      }
+    }
+    
+    pr.getDiskStore().flush();
+    
+    System.out.println("----Done with random operations");
+
+    numEntries = pr.entryCount();
+    
+    if(stats.getLong("dataStoreBytesInUse") == 0) {
+      //It appears we can get into a case here where all entries are overflowed,
+      //rather than just one. I think this may be due to removing the 
+      //one in memory entry.
+      assertEquals(numEntries * entryOverflowSize, diskStats.getNumOverflowBytesOnDisk());
+//    The entry count stats have a bug, they are getting off. It appears
+//    to be related to incrementing the stats multiple times for TOMBSTONES, but
+//    only for async regions
+//      assertEquals(0 , diskStats.getNumEntriesInVM());
+//      assertEquals(numEntries , diskStats.getNumOverflowOnDisk());
+      assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+      assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+    } else {
+      assertEquals(singleEntryMemSize, stats.getLong("dataStoreBytesInUse"));
+//    assertEquals(numEntries , stats.getInt("dataStoreEntryCount"));
+      assertEquals((numEntries -1) * entryOverflowSize, diskStats.getNumOverflowBytesOnDisk());
+//    assertEquals(1 , diskStats.getNumEntriesInVM());
+//      assertEquals((numEntries -1) , diskStats.getNumOverflowOnDisk());
+      assertEquals(stats.getLong("dataStoreBytesInUse"), getMemBytes(pr));
+      assertEquals(diskStats.getNumOverflowBytesOnDisk(), getDiskBytes(pr));
+    }
+  }
+
+  private Object getDiskBytes(PartitionedRegion pr) {
+Set<BucketRegion> brs = pr.getDataStore().getAllLocalBucketRegions();
+    
+    long bytes = 0;
+    for(Iterator<BucketRegion> itr = brs.iterator(); itr.hasNext(); ) {
+      BucketRegion br = itr.next();
+      bytes += br.getNumOverflowBytesOnDisk();
+    }
+    
+    return bytes;
+  }
+
+  private long getMemBytes(PartitionedRegion pr) {
+    Set<BucketRegion> brs = pr.getDataStore().getAllLocalBucketRegions();
+    
+    long bytes = 0;
+    for(Iterator<BucketRegion> itr = brs.iterator(); itr.hasNext(); ) {
+      BucketRegion br = itr.next();
+      bytes += br.getBytesInMemory();
+    }
+    
+    return bytes;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9dff599b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionTestHelper.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionTestHelper.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionTestHelper.java
index 5db33b6..1b38a83 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionTestHelper.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/PartitionedRegionTestHelper.java
@@ -246,7 +246,8 @@ public class PartitionedRegionTestHelper
 public static synchronized void closeCache()
  {
   if(cache != null){
-  cache.close();
+    cache.close();
+    cache = null;
   }
    
  }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/9dff599b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
index a80cb9b..e340640 100644
--- a/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
+++ b/gemfire-core/src/test/java/com/gemstone/gemfire/internal/cache/control/RebalanceOperationDUnitTest.java
@@ -9,9 +9,12 @@ package com.gemstone.gemfire.internal.cache.control;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
+import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.CancellationException;
@@ -28,6 +31,8 @@ import com.gemstone.gemfire.cache.DataPolicy;
 import com.gemstone.gemfire.cache.DiskStore;
 import com.gemstone.gemfire.cache.DiskStoreFactory;
 import com.gemstone.gemfire.cache.EntryEvent;
+import com.gemstone.gemfire.cache.EvictionAction;
+import com.gemstone.gemfire.cache.EvictionAttributes;
 import com.gemstone.gemfire.cache.LoaderHelper;
 import com.gemstone.gemfire.cache.PartitionAttributes;
 import com.gemstone.gemfire.cache.PartitionAttributesFactory;
@@ -1522,7 +1527,7 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
   public void testMoveBucketsWithRedundancySimulation() {
     moveBucketsWithRedundancy(true);
   }
-  
+
   public void testMoveBucketsWithRedundancy() {
     moveBucketsWithRedundancy(false);
   }
@@ -1640,6 +1645,227 @@ public class RebalanceOperationDUnitTest extends CacheTestCase {
     }
   }
   
+  /** A test that the stats when overflowing entries to disk
+   * are correct and we still rebalance correctly
+   */
+  public void testMoveBucketsOverflowToDisk() throws Throwable {
+    
+    System.setProperty("gemfire.LOG_REBALANCE", "true");
+    invokeInEveryVM(new SerializableCallable() {
+      
+      @Override
+      public Object call() throws Exception {
+        System.setProperty("gemfire.LOG_REBALANCE", "true");
+        return null;
+      }
+    });
+
+    Host host = Host.getHost(0);
+    VM vm0 = host.getVM(0);
+    VM vm1 = host.getVM(1);
+    VM vm2 = host.getVM(2);
+    VM vm3 = host.getVM(3);
+
+    SerializableRunnable createPrRegion = new SerializableRunnable("createRegion") {
+      public void run()
+      {
+        Cache cache = getCache();
+        AttributesFactory attr = new AttributesFactory();
+        PartitionAttributesFactory paf = new PartitionAttributesFactory();
+        paf.setRedundantCopies(1);
+        paf.setRecoveryDelay(-1);
+        paf.setStartupRecoveryDelay(-1);
+        PartitionAttributes prAttr = paf.create();
+        attr.setPartitionAttributes(prAttr);
+        attr.setEvictionAttributes(EvictionAttributes.createLRUEntryAttributes(1, EvictionAction.OVERFLOW_TO_DISK));
+        cache.createRegion("region1", attr.create());
+      }
+    };
+    
+    //Create the region in two VMs
+    vm0.invoke(createPrRegion);
+    vm1.invoke(createPrRegion);
+    
+    //Create some buckets
+    vm0.invoke(new SerializableRunnable("createSomeBuckets") {
+      
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region1");
+        for(int i =0; i < 12; i++) {
+          Map m = new HashMap();
+          for (int j = 0; j < 200; j++) {
+            m.put(Integer.valueOf(i + 113*j), "A");
+          }
+          region.putAll(m);
+        }
+      }
+    });
+
+    //Do some puts and gets, to trigger eviction
+    SerializableRunnable doOps = new SerializableRunnable("doOps") {
+      
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region1");
+        
+        Random rand = new Random();
+        
+        for(int count = 0; count < 5000; count++) {
+          int bucket = (int) (count % 12);
+          int key = rand.nextInt(20);
+          region.put(Integer.valueOf(bucket + 113*key), "B");
+        }
+        
+        for(int count = 0; count < 500; count++) {
+          int bucket = (int) (count % 12);
+          int key = rand.nextInt(20);
+          region.get(Integer.valueOf(bucket + 113*key));
+        }
+      }
+    };
+    
+    //Do some operations
+    vm0.invoke(doOps);
+    
+    //Create the region in one more VM.
+    vm2.invoke(createPrRegion);
+    
+    //Now do a rebalance
+    final Long totalSize = (Long) vm0.invoke(new SerializableCallable("simulateRebalance")
{
+      
+      public Object call() {
+        Cache cache = getCache();
+        ResourceManager manager = cache.getResourceManager();
+        RebalanceResults results = doRebalance(false, manager);
+        assertEquals(0, results.getTotalBucketCreatesCompleted());
+        //We don't know how many primaries will move, it depends on
+        //if the move bucket code moves the primary or a redundant bucket
+        //assertEquals(0, results.getTotalPrimaryTransfersCompleted());
+        assertEquals(8, results.getTotalBucketTransfersCompleted());
+        assertTrue(0 < results.getTotalBucketTransferBytes());
+        Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
+        assertEquals(1, detailSet.size());
+        PartitionRebalanceInfo details = detailSet.iterator().next();
+        assertEquals(0, details.getBucketCreatesCompleted());
+        assertTrue(0 < details.getBucketTransferBytes());
+        assertEquals(8, details.getBucketTransfersCompleted());
+        
+        long totalSize = 0;
+        Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsAfter();
+        for(PartitionMemberInfo memberDetails: beforeDetails) {
+          totalSize += memberDetails.getSize();
+        }
+        
+        long afterSize = 0;
+        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
+        assertEquals(3, afterDetails.size());
+        for(PartitionMemberInfo memberDetails: afterDetails) {
+          assertEquals(8, memberDetails.getBucketCount());
+          assertEquals(4, memberDetails.getPrimaryCount());
+          afterSize += memberDetails.getSize();
+        }
+        assertEquals(totalSize, afterSize);
+        verifyStats(manager, results);
+        
+        return Long.valueOf(totalSize);
+      }
+    });
+
+    SerializableRunnable checkBalance = new SerializableRunnable("checkBalance") {
+
+      public void run() {
+        Cache cache = getCache();
+        Region region = cache.getRegion("region1");
+        PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
+        assertEquals(12, details.getCreatedBucketCount());
+        assertEquals(1,details.getActualRedundantCopies());
+        assertEquals(0,details.getLowRedundancyBucketCount());
+        getLogWriter().info("details=" + details.getPartitionMemberInfo());
+        long afterSize = 0;
+        for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
+          assertEquals(8, memberDetails.getBucketCount());
+          assertEquals(4, memberDetails.getPrimaryCount());
+          afterSize += memberDetails.getSize();
+        }
+        //assertEquals(totalSize.longValue(), afterSize);
+      }
+    };
+
+    vm0.invoke(checkBalance);
+    vm1.invoke(checkBalance);
+    vm2.invoke(checkBalance);
+    
+    //Create the region in one more VM.
+    vm3.invoke(createPrRegion);
+    
+    //Do another rebalance
+    vm0.invoke(new SerializableCallable("simulateRebalance") {
+      
+      public Object call() {
+        Cache cache = getCache();
+        ResourceManager manager = cache.getResourceManager();
+        RebalanceResults results = doRebalance(false, manager);
+        assertEquals(0, results.getTotalBucketCreatesCompleted());
+        //We don't know how many primaries will move, it depends on
+        //if the move bucket code moves the primary or a redundant bucket
+        //assertEquals(0, results.getTotalPrimaryTransfersCompleted());
+        assertEquals(6, results.getTotalBucketTransfersCompleted());
+        assertTrue(0 < results.getTotalBucketTransferBytes());
+        Set<PartitionRebalanceInfo> detailSet = results.getPartitionRebalanceDetails();
+        assertEquals(1, detailSet.size());
+        PartitionRebalanceInfo details = detailSet.iterator().next();
+        assertEquals(0, details.getBucketCreatesCompleted());
+        assertTrue(0 < details.getBucketTransferBytes());
+        assertEquals(6, details.getBucketTransfersCompleted());
+        
+        long totalSize = 0;
+        Set<PartitionMemberInfo> beforeDetails = details.getPartitionMemberDetailsAfter();
+        for(PartitionMemberInfo memberDetails: beforeDetails) {
+          totalSize += memberDetails.getSize();
+        }
+        
+        long afterSize = 0;
+        Set<PartitionMemberInfo> afterDetails = details.getPartitionMemberDetailsAfter();
+        assertEquals(4, afterDetails.size());
+        for(PartitionMemberInfo memberDetails: afterDetails) {
+          assertEquals(6, memberDetails.getBucketCount());
+//          assertEquals(3, memberDetails.getPrimaryCount());
+          afterSize += memberDetails.getSize();
+        }
+        assertEquals(totalSize, afterSize);
+        //TODO - need to fix verifyStats to handle a second rebalance
+//        verifyStats(manager, results);
+        
+        return Long.valueOf(totalSize);
+      }
+    });
+
+      checkBalance = new SerializableRunnable("checkBalance") {
+
+        public void run() {
+          Cache cache = getCache();
+          Region region = cache.getRegion("region1");
+          PartitionRegionInfo details = PartitionRegionHelper.getPartitionRegionInfo(region);
+          assertEquals(12, details.getCreatedBucketCount());
+          assertEquals(1,details.getActualRedundantCopies());
+          assertEquals(0,details.getLowRedundancyBucketCount());
+          getLogWriter().info("details=" + details.getPartitionMemberInfo());
+          long afterSize = 0;
+          for(PartitionMemberInfo memberDetails: details.getPartitionMemberInfo()) {
+            assertEquals(6, memberDetails.getBucketCount());
+            //            assertEquals(3, memberDetails.getPrimaryCount());
+            afterSize += memberDetails.getSize();
+          }
+          //assertEquals(totalSize.longValue(), afterSize);
+        }
+      };
+
+      vm0.invoke(checkBalance);
+      vm1.invoke(checkBalance);
+      vm2.invoke(checkBalance);
+  }
+  
   /**
    * Test to make sure we balance buckets between three hosts with redundancy
    */


Mime
View raw message