geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ash...@apache.org
Subject incubator-geode git commit: GEODE-124: Add Auto-Rebalance trigger logic
Date Thu, 23 Jul 2015 22:56:29 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-124 8abba2046 -> dad71e90f


GEODE-124: Add Auto-Rebalance trigger logic

* Add size threshold configuration
* Add logic to trigger rebalance if size to move is above threshold


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/dad71e90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/dad71e90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/dad71e90

Branch: refs/heads/feature/GEODE-124
Commit: dad71e90f2b7d4ef8d354ea7c77ac1bef0d71a6c
Parents: 8abba20
Author: Ashvin Agrawal <ashvin@apache.org>
Authored: Thu Jul 23 14:18:09 2015 -0700
Committer: Ashvin Agrawal <ashvin@apache.org>
Committed: Thu Jul 23 15:43:21 2015 -0700

----------------------------------------------------------------------
 gemfire-rebalancer/build.gradle                 |   6 +
 .../gemfire/cache/util/AutoBalancer.java        | 143 ++++++++++--
 .../cache/util/AutoBalancerJUnitTest.java       | 219 ++++++++++++++++++-
 3 files changed, 345 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dad71e90/gemfire-rebalancer/build.gradle
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/build.gradle b/gemfire-rebalancer/build.gradle
index 9e4f1d0..d87dd58 100644
--- a/gemfire-rebalancer/build.gradle
+++ b/gemfire-rebalancer/build.gradle
@@ -3,4 +3,10 @@ dependencies {
     provided project(path: ':gemfire-junit', configuration: 'testOutput')
 
     compile 'org.quartz-scheduler:quartz:2.2.1'
+
+    // the following test dependencies are needed for mocking cache instance
+    testRuntime 'org.apache.hadoop:hadoop-common:2.4.1'
+    testRuntime 'org.apache.hadoop:hadoop-hdfs:2.4.1'
+    testRuntime 'com.google.guava:guava:11.0.2'
+    testRuntime 'commons-collections:commons-collections:3.2.1'
 }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dad71e90/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
index aec72cd..19c028f 100644
--- a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
+++ b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
@@ -50,15 +50,40 @@ import com.gemstone.gemfire.internal.logging.LogService;
  */
 public class AutoBalancer implements Declarable {
   /**
-   * Use this configuration to provide out-of-balance audit. If the audit finds
-   * the system to be out-of-balance, it will trigger re-balancing. Any valid
-   * cron string is accepted. The first value represent second.
+   * Use this configuration to manage out-of-balance audit frequency. If the
+   * auditor finds the system to be out-of-balance, it will trigger
+   * re-balancing. Any valid cron string is accepted. The sub-expressions
+   * represent the following:
+   * <OL>
+   * <LI>Seconds
+   * <LI>Minutes
+   * <LI>Hours
+   * <LI>Day-of-Month
+   * <LI>Month
+   * <LI>Day-of-Week
+   * <LI>Year (optional field)
+   * 
    * <P>
-   * For. e.g. {@code 0 0 * * * *} for auditing the system every hour
+   * For. e.g. {@code 0 0 * * * ?} for auditing the system every hour
    */
   public static final String SCHEDULE = "schedule";
 
   /**
+   * Use this configuration to manage re-balance threshold. Rebalance operation
+   * will be triggered if the total number of bytes rebalance operation may move
+   * is more than this threshold, percentage of the total data size.
+   * <P>
+   * Default {@value AutoBalancer#DEFAULT_SIZE_THRESHOLD_PERCENT}
+   */
+  public static final String SIZE_THRESHOLD_PERCENT = "size-threshold-percent";
+
+  /**
+   * Default value of {@link AutoBalancer#SIZE_THRESHOLD_PERCENT}. If 10% of
+   * data is misplaced, its a good time to redistribute buckets
+   */
+  public static final int DEFAULT_SIZE_THRESHOLD_PERCENT = 10;
+
+  /**
    * Name of the DistributedLockService that {@link AutoBalancer} will use to
    * guard against concurrent maintenance activity
    */
@@ -79,12 +104,13 @@ public class AutoBalancer implements Declarable {
       logger.debug("Initiazing " + this.getClass().getSimpleName() + " with " + props);
     }
 
-    if (props != null) {
-      String schedule = props.getProperty(SCHEDULE);
+    auditor.init(props);
 
-      auditor.init(props);
-      scheduler.init(schedule);
+    String schedule = null;
+    if (props != null) {
+      schedule = props.getProperty(SCHEDULE);
     }
+    scheduler.init(schedule);
   }
 
   /**
@@ -161,11 +187,22 @@ public class AutoBalancer implements Declarable {
    * <LI>release lock
    */
   class SizeBasedOOBAuditor implements OOBAuditor {
+    private int sizeThreshold = DEFAULT_SIZE_THRESHOLD_PERCENT;
+
     @Override
     public void init(Properties props) {
       if (logger.isDebugEnabled()) {
         logger.debug("Initiazing " + this.getClass().getSimpleName());
       }
+
+      if (props != null) {
+        if (props.getProperty(SIZE_THRESHOLD_PERCENT) != null) {
+          sizeThreshold = Integer.valueOf(props.getProperty(SIZE_THRESHOLD_PERCENT));
+          if (sizeThreshold <= 0 || sizeThreshold >= 100) {
+            throw new GemFireConfigException(SIZE_THRESHOLD_PERCENT + " should be integer,
1 to 99");
+          }
+        }
+      }
     }
 
     @Override
@@ -178,12 +215,53 @@ public class AutoBalancer implements Declarable {
         }
         return;
       }
+
       try {
+        result = needsRebalancing();
+        if (!result) {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Rebalancing is not needed");
+          }
+          return;
+        }
+
         cacheFacade.rebalance();
       } finally {
         cacheFacade.releaseAutoBalanceLock();
       }
     }
+
+    /**
+     * By default auto-balancer will avoid rebalancing, because a user can
+     * always trigger rebalance manually. So in case of error or inconsistent
+     * data, return false. Return true if
+     * <OL>
+     * <LI>total transfer size is above threshold percent of total data size at
+     * cluster level
+     * <LI>If some smaller capacity nodes are heavily loaded while bigger
+     * capacity nodes are balanced. In such a scenario transfer size based
+     * trigger may not cause rebalance.
+     */
+    boolean needsRebalancing() {
+      // test cluster level status
+      long transferSize = cacheFacade.getTotalTransferSize();
+      long totalSize = cacheFacade.getTotalDataSize();
+
+      if (totalSize > 0) {
+        int transferPercent = (int) ((100.0 * transferSize) / totalSize);
+        if (transferPercent >= sizeThreshold) {
+          return true;
+        }
+      }
+
+      // TODO test member level skew
+
+      return false;
+    }
+
+    public int getSizeThreshold() {
+      return sizeThreshold;
+    }
   }
 
   /**
@@ -192,6 +270,34 @@ public class AutoBalancer implements Declarable {
    */
   static class GeodeCacheFacade implements CacheOperationFacade {
     @Override
+    public long getTotalDataSize() {
+      // TODO Auto-generated method stub
+      return getTotalTransferSize();
+    }
+
+    @Override
+    public long getTotalTransferSize() {
+      try {
+        RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().simulate();
+        RebalanceResults result = operation.getResults();
+        if (logger.isDebugEnabled()) {
+          logger.debug("Rebalance estimate: RebalanceResultsImpl [TotalBucketCreateBytes="
+              + result.getTotalBucketCreateBytes() + ", TotalBucketCreatesCompleted="
+              + result.getTotalBucketCreatesCompleted() + ", TotalBucketTransferBytes="
+              + result.getTotalBucketTransferBytes() + ", TotalBucketTransfersCompleted="
+              + result.getTotalBucketTransfersCompleted() + ", TotalPrimaryTransfersCompleted="
+              + result.getTotalPrimaryTransfersCompleted() + "]");
+        }
+        return result.getTotalBucketTransferBytes();
+      } catch (CancellationException e) {
+        logger.info("Error while trying to estimate rebalance cost ", e);
+      } catch (InterruptedException e) {
+        logger.info("Error while trying to estimate rebalance cost ", e);
+      }
+      return 0;
+    }
+
+    @Override
     public void incrementAttemptCounter() {
       GemFireCacheImpl cache = getCache();
       try {
@@ -203,20 +309,23 @@ public class AutoBalancer implements Declarable {
 
     @Override
     public void rebalance() {
-      RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().simulate();
       try {
+        RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().start();
         RebalanceResults result = operation.getResults();
-        logger.info(result.getTotalBucketTransfersCompleted() + " " + result.getTotalBucketTransferBytes());
+        logger.info("Rebalance result: RebalanceResultsImpl [TotalBucketCreateBytes="
+            + result.getTotalBucketCreateBytes() + ", TotalBucketCreatesCompleted="
+            + result.getTotalBucketCreatesCompleted() + ", TotalBucketTransferBytes="
+            + result.getTotalBucketTransferBytes() + ", TotalBucketTransfersCompleted="
+            + result.getTotalBucketTransfersCompleted() + ", TotalPrimaryTransfersCompleted="
+            + result.getTotalPrimaryTransfersCompleted() + "]");
       } catch (CancellationException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+        logger.info("Error rebalancing the cluster", e);
       } catch (InterruptedException e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
+        logger.info("Error rebalancing the cluster", e);
       }
     }
 
-    private GemFireCacheImpl getCache() {
+    GemFireCacheImpl getCache() {
       GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
       if (cache == null) {
         throw new IllegalStateException("Missing cache instance.");
@@ -288,6 +397,10 @@ public class AutoBalancer implements Declarable {
     void rebalance();
 
     void incrementAttemptCounter();
+
+    long getTotalDataSize();
+
+    long getTotalTransferSize();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/dad71e90/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
index 76d7e74..90ddbc3 100644
--- a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
+++ b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
@@ -3,6 +3,7 @@ package com.gemstone.gemfire.cache.util;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.ByteArrayInputStream;
 import java.util.Properties;
@@ -23,21 +24,24 @@ import org.junit.experimental.categories.Category;
 
 import com.gemstone.gemfire.GemFireConfigException;
 import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.control.RebalanceFactory;
+import com.gemstone.gemfire.cache.control.RebalanceOperation;
+import com.gemstone.gemfire.cache.control.RebalanceResults;
 import com.gemstone.gemfire.cache.util.AutoBalancer.AuditScheduler;
 import com.gemstone.gemfire.cache.util.AutoBalancer.CacheOperationFacade;
 import com.gemstone.gemfire.cache.util.AutoBalancer.GeodeCacheFacade;
 import com.gemstone.gemfire.cache.util.AutoBalancer.OOBAuditor;
+import com.gemstone.gemfire.cache.util.AutoBalancer.SizeBasedOOBAuditor;
 import com.gemstone.gemfire.cache.util.AutoBalancer.TimeProvider;
 import com.gemstone.gemfire.distributed.DistributedLockService;
 import com.gemstone.gemfire.distributed.internal.locks.DLockService;
 import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.cache.control.InternalResourceManager;
 import com.gemstone.gemfire.test.junit.categories.UnitTest;
 
 @Category(UnitTest.class)
 public class AutoBalancerJUnitTest {
 
-  // OOB > threshold
-  // OOB < threshold
   // OOB > threshold && size < min
   // OOB > threshold && size < min
   // OOB critical nodes
@@ -314,6 +318,104 @@ public class AutoBalancerJUnitTest {
   }
 
   @Test
+  public void testFailExecuteIfBalanced() throws InterruptedException {
+    cache = createBasicCache();
+
+    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockCacheFacade).acquireAutoBalanceLock();
+        will(returnValue(true));
+        never(mockCacheFacade).rebalance();
+        oneOf(mockCacheFacade).incrementAttemptCounter();
+        oneOf(mockCacheFacade).releaseAutoBalanceLock();
+      }
+    });
+
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.setCacheOperationFacade(mockCacheFacade);
+
+    SizeBasedOOBAuditor auditor = balancer.new SizeBasedOOBAuditor() {
+      @Override
+      boolean needsRebalancing() {
+        return false;
+      }
+    };
+    balancer.setOOBAuditor(auditor);
+    balancer.getOOBAuditor().execute();
+  }
+
+  @Test
+  public void testOOBWhenBelowSizeThreshold() {
+    final long totalSize = 1000L;
+
+    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
+    mockContext.checking(new Expectations() {
+      {
+        // first run
+        oneOf(mockCacheFacade).getTotalDataSize();
+        will(returnValue(totalSize));
+        oneOf(mockCacheFacade).getTotalTransferSize();
+        // half of threshold limit
+        will(returnValue((AutoBalancer.DEFAULT_SIZE_THRESHOLD_PERCENT * totalSize / 100)
/ 2));
+
+        // second run
+        oneOf(mockCacheFacade).getTotalDataSize();
+        will(returnValue(totalSize));
+        oneOf(mockCacheFacade).getTotalTransferSize();
+        // nothing to transfer
+        will(returnValue(0L));
+      }
+    });
+
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.setCacheOperationFacade(mockCacheFacade);
+    balancer.init(getBasicConfig());
+    SizeBasedOOBAuditor auditor = (SizeBasedOOBAuditor) balancer.getOOBAuditor();
+
+    // first run
+    assertFalse(auditor.needsRebalancing());
+
+    // second run
+    assertFalse(auditor.needsRebalancing());
+  }
+
+  @Test
+  public void testOOBWhenBelowAboveThreshold() {
+    final long totalSize = 1000L;
+
+    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
+    mockContext.checking(new Expectations() {
+      {
+        // first run
+        oneOf(mockCacheFacade).getTotalDataSize();
+        will(returnValue(totalSize));
+        oneOf(mockCacheFacade).getTotalTransferSize();
+        // twice threshold
+        will(returnValue((AutoBalancer.DEFAULT_SIZE_THRESHOLD_PERCENT * totalSize / 100)
* 2));
+
+        // second run
+        oneOf(mockCacheFacade).getTotalDataSize();
+        will(returnValue(totalSize));
+        oneOf(mockCacheFacade).getTotalTransferSize();
+        // more than total size
+        will(returnValue(2 * totalSize));
+      }
+    });
+
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.setCacheOperationFacade(mockCacheFacade);
+    balancer.init(getBasicConfig());
+    SizeBasedOOBAuditor auditor = (SizeBasedOOBAuditor) balancer.getOOBAuditor();
+
+    // first run
+    assertTrue(auditor.needsRebalancing());
+
+    // second run
+    assertTrue(auditor.needsRebalancing());
+  }
+
+  @Test
   public void testInitializerCacheXML() {
     String configStr = "<cache xmlns=\"http://schema.pivotal.io/gemfire/cache\"      
                   "
         + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"                     
                "
@@ -365,10 +467,7 @@ public class AutoBalancerJUnitTest {
       }
     });
 
-    // every second schedule
-    String someSchedule = "* * * * * ?";
-    Properties props = new Properties();
-    props.put(AutoBalancer.SCHEDULE, someSchedule);
+    Properties props = getBasicConfig();
 
     assertEquals(0, count);
     AutoBalancer autoR = new AutoBalancer();
@@ -392,10 +491,50 @@ public class AutoBalancerJUnitTest {
   }
 
   @Test
+  public void testOOBAuditorInit() {
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.init(getBasicConfig());
+    SizeBasedOOBAuditor auditor = (SizeBasedOOBAuditor) balancer.getOOBAuditor();
+    assertEquals(AutoBalancer.DEFAULT_SIZE_THRESHOLD_PERCENT, auditor.getSizeThreshold());
+
+    Properties props = getBasicConfig();
+    props.put(AutoBalancer.SIZE_THRESHOLD_PERCENT, "17");
+    balancer = new AutoBalancer();
+    balancer.init(props);
+    auditor = (SizeBasedOOBAuditor) balancer.getOOBAuditor();
+    assertEquals(17, auditor.getSizeThreshold());
+  }
+
+  @Test(expected = GemFireConfigException.class)
+  public void testSizeThresholdNegative() {
+    AutoBalancer balancer = new AutoBalancer();
+    Properties props = getBasicConfig();
+    props.put(AutoBalancer.SIZE_THRESHOLD_PERCENT, "-1");
+    balancer.init(props);
+  }
+
+  @Test(expected = GemFireConfigException.class)
+  public void testSizeThresholdZero() {
+    AutoBalancer balancer = new AutoBalancer();
+    Properties props = getBasicConfig();
+    props.put(AutoBalancer.SIZE_THRESHOLD_PERCENT, "0");
+    balancer.init(props);
+  }
+
+  @Test(expected = GemFireConfigException.class)
+  public void testSizeThresholdToohigh() {
+    AutoBalancer balancer = new AutoBalancer();
+    Properties props = getBasicConfig();
+    props.put(AutoBalancer.SIZE_THRESHOLD_PERCENT, "100");
+    balancer.init(props);
+  }
+
+  @Test
   public void testAutoBalancerInit() {
     final String someSchedule = "1 * * * 1 *";
     final Properties props = new Properties();
     props.put(AutoBalancer.SCHEDULE, someSchedule);
+    props.put(AutoBalancer.SIZE_THRESHOLD_PERCENT, 17);
 
     final AuditScheduler mockScheduler = mockContext.mock(AuditScheduler.class);
     final OOBAuditor mockAuditor = mockContext.mock(OOBAuditor.class);
@@ -414,8 +553,72 @@ public class AutoBalancerJUnitTest {
   }
 
   @Test
-  public void testNullInitialization() {
+  public void testMinimalConfiguration() {
     AutoBalancer autoR = new AutoBalancer();
-    autoR.init(null);
+    try {
+      autoR.init(null);
+      fail();
+    } catch (GemFireConfigException e) {
+      // expected
+    }
+
+    Properties props = getBasicConfig();
+    autoR.init(props);
+  }
+
+  @Test
+  public void testFacadeTotalTransferSize() throws Exception {
+    assertEquals(12345, getFacadeForResourceManagerOps(true).getTotalTransferSize());
+  }
+  
+  @Test
+  public void testFacadeRebalance() throws Exception {
+    getFacadeForResourceManagerOps(false).rebalance();
+  }
+
+  private GeodeCacheFacade getFacadeForResourceManagerOps(final boolean simulate) throws
Exception {
+    final GemFireCacheImpl mockCache = mockContext.mock(GemFireCacheImpl.class);
+    final InternalResourceManager mockRM = mockContext.mock(InternalResourceManager.class);
+    final RebalanceFactory mockRebalanceFactory = mockContext.mock(RebalanceFactory.class);
+    final RebalanceOperation mockRebalanceOperation = mockContext.mock(RebalanceOperation.class);
+    final RebalanceResults mockRebalanceResults = mockContext.mock(RebalanceResults.class);
+    
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockCache).getResourceManager();
+        will(returnValue(mockRM));
+        oneOf(mockRM).createRebalanceFactory();
+        will(returnValue(mockRebalanceFactory));
+        if (simulate) {
+          oneOf(mockRebalanceFactory).simulate();
+        } else {
+          oneOf(mockRebalanceFactory).start();
+        }
+        will(returnValue(mockRebalanceOperation));
+        oneOf(mockRebalanceOperation).getResults();
+        will(returnValue(mockRebalanceResults));
+        if (simulate) {
+          atLeast(1).of(mockRebalanceResults).getTotalBucketTransferBytes();
+          will(returnValue(12345L));
+        }
+        allowing(mockRebalanceResults);
+      }
+    });
+    
+    GeodeCacheFacade facade = new GeodeCacheFacade() {
+      @Override
+      GemFireCacheImpl getCache() {
+        return mockCache;
+      }
+    };
+    
+    return facade;
+  }
+  
+  private Properties getBasicConfig() {
+    Properties props = new Properties();
+    // every second schedule
+    props.put(AutoBalancer.SCHEDULE, "* * * * * ?");
+    return props;
   }
 }


Mime
View raw message