geode-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ash...@apache.org
Subject incubator-geode git commit: GEODE-124: Create basic auto-balancer skeleton
Date Thu, 23 Jul 2015 17:12:34 GMT
Repository: incubator-geode
Updated Branches:
  refs/heads/feature/GEODE-124 [created] 8abba2046


GEODE-124: Create basic auto-balancer skeleton

* Create a sub-project for auto-rebalancer
* Commit quartz based scheduler to parse and interpret cron strings
* Add a new DistributedLockService and Object for AutoBalancer
* Create stat for auto-rebalance-attempts in ResourceStats


Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/8abba204
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/8abba204
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/8abba204

Branch: refs/heads/feature/GEODE-124
Commit: 8abba2046c7e6316ce189f18e63dda5a22e673e0
Parents: b3e2167
Author: Ashvin Agrawal <ashvina@ashvina.local>
Authored: Sun Jul 19 23:50:09 2015 -0700
Committer: Ashvin Agrawal <ashvina@office-5-130.pa.gopivotal.com>
Committed: Thu Jul 23 10:10:40 2015 -0700

----------------------------------------------------------------------
 .../cache/control/ResourceManagerStats.java     |  13 +
 gemfire-rebalancer/build.gradle                 |   6 +
 .../gemfire/cache/util/AutoBalancer.java        | 327 ++++++++++++++
 .../cache/util/AutoBalancerJUnitTest.java       | 421 +++++++++++++++++++
 settings.gradle                                 |   1 +
 5 files changed, 768 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8abba204/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/ResourceManagerStats.java
----------------------------------------------------------------------
diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/ResourceManagerStats.java
b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/ResourceManagerStats.java
index 79a3c96..38e8034 100644
--- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/ResourceManagerStats.java
+++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/control/ResourceManagerStats.java
@@ -27,6 +27,7 @@ public class ResourceManagerStats {
   
   private static final int rebalancesInProgressId;
   private static final int rebalancesCompletedId;
+  private static final int autoRebalanceAttemptsId;
   private static final int rebalanceTimeId;
   private static final int rebalanceBucketCreatesInProgressId;
   private static final int rebalanceBucketCreatesCompletedId;
@@ -81,6 +82,10 @@ public class ResourceManagerStats {
               "rebalancesCompleted",
               "Total number of cache rebalance operations directed by this process.",
               "operations"),
+            f.createIntCounter(
+                "autoRebalanceAttempts",
+                "Total number of cache auto-rebalance attempts.",
+                "operations"),
             f.createLongCounter(
               "rebalanceTime",
               "Total time spent directing cache rebalance operations.",
@@ -239,6 +244,7 @@ public class ResourceManagerStats {
     
     rebalancesInProgressId = type.nameToId("rebalancesInProgress");
     rebalancesCompletedId = type.nameToId("rebalancesCompleted");
+    autoRebalanceAttemptsId = type.nameToId("autoRebalanceAttempts");
     rebalanceTimeId = type.nameToId("rebalanceTime");
     rebalanceBucketCreatesInProgressId = type.nameToId("rebalanceBucketCreatesInProgress");
     rebalanceBucketCreatesCompletedId = type.nameToId("rebalanceBucketCreatesCompleted");
@@ -293,6 +299,10 @@ public class ResourceManagerStats {
     return System.nanoTime();
   }
   
+  public void incAutoRebalanceAttempts() {
+    this.stats.incInt(autoRebalanceAttemptsId, 1);
+  }
+  
   public void endRebalance(long start) {
     long elapsed = System.nanoTime() - start;
     this.stats.incInt(rebalancesInProgressId, -1);
@@ -373,6 +383,9 @@ public class ResourceManagerStats {
   public int getRebalancesCompleted() {
     return this.stats.getInt(rebalancesCompletedId);
   }
+  public int getAutoRebalanceAttempts() {
+    return this.stats.getInt(autoRebalanceAttemptsId);
+  }
   public long getRebalanceTime() {
     return this.stats.getLong(rebalanceTimeId);
   }

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8abba204/gemfire-rebalancer/build.gradle
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/build.gradle b/gemfire-rebalancer/build.gradle
new file mode 100644
index 0000000..9e4f1d0
--- /dev/null
+++ b/gemfire-rebalancer/build.gradle
@@ -0,0 +1,6 @@
+dependencies {
+    provided project(':gemfire-core')
+    provided project(path: ':gemfire-junit', configuration: 'testOutput')
+
+    compile 'org.quartz-scheduler:quartz:2.2.1'
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8abba204/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
new file mode 100644
index 0000000..aec72cd
--- /dev/null
+++ b/gemfire-rebalancer/src/main/java/com/gemstone/gemfire/cache/util/AutoBalancer.java
@@ -0,0 +1,327 @@
+package com.gemstone.gemfire.cache.util;
+
+import java.util.Date;
+import java.util.Properties;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.logging.log4j.Logger;
+import org.quartz.CronExpression;
+import org.springframework.scheduling.support.CronSequenceGenerator;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.Declarable;
+import com.gemstone.gemfire.cache.GemFireCache;
+import com.gemstone.gemfire.cache.control.RebalanceOperation;
+import com.gemstone.gemfire.cache.control.RebalanceResults;
+import com.gemstone.gemfire.distributed.DistributedLockService;
+import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.internal.logging.LogService;
+
+/**
+ * Re-balancing operation relocates data from heavily loaded members to lightly
+ * loaded members. In most cases, the decision to re-balance is based on the
+ * size of the member and a few other statistics. {@link AutoBalancer} monitors
+ * these statistics and if necessary, triggers a re-balancing request.
+ * Auto-Balancing is expected to prevent failures and data loss.
+ * 
+ * <P>
+ * This implementation is based on {@code Initializer} implementation. By
+ * default auto-balancing is disabled. A user needs to configure
+ * {@link AutoBalancer} during cache initialization
+ * {@link GemFireCache#getInitializer()}
+ * 
+ * <P>
+ * In a cluster only one member owns auto-balancing responsibility. This is
+ * achieved by grabbing a distributed lock. In case of a failure a new member
+ * will grab the lock and manage auto balancing.
+ * 
+ * <P>
+ * {@link AutoBalancer} can be controlled using the following configurations
+ * <OL>
+ * <LI> {@link AutoBalancer#SCHEDULE}
+ * <LI>TBD THRESHOLDS
+ * 
+ * @author Ashvin Agrawal
+ */
+public class AutoBalancer implements Declarable {
+  /**
+   * Use this configuration to provide out-of-balance audit. If the audit finds
+   * the system to be out-of-balance, it will trigger re-balancing. Any valid
+   * cron string is accepted. The first value represent second.
+   * <P>
+   * For. e.g. {@code 0 0 * * * *} for auditing the system every hour
+   */
+  public static final String SCHEDULE = "schedule";
+
+  /**
+   * Name of the DistributedLockService that {@link AutoBalancer} will use to
+   * guard against concurrent maintenance activity
+   */
+  public static final String AUTO_BALANCER_LOCK_SERVICE_NAME = "__AUTO_B";
+
+  public static final Object AUTO_BALANCER_LOCK = "__AUTO_B_LOCK";
+
+  private AuditScheduler scheduler = new CronScheduler();
+  private OOBAuditor auditor = new SizeBasedOOBAuditor();
+  private TimeProvider clock = new SystemClockTimeProvider();
+  private CacheOperationFacade cacheFacade = new GeodeCacheFacade();
+
+  private static final Logger logger = LogService.getLogger();
+
+  @Override
+  public void init(Properties props) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Initiazing " + this.getClass().getSimpleName() + " with " + props);
+    }
+
+    if (props != null) {
+      String schedule = props.getProperty(SCHEDULE);
+
+      auditor.init(props);
+      scheduler.init(schedule);
+    }
+  }
+
+  /**
+   * Invokes audit triggers based on a cron schedule.
+   * <OL>
+   * <LI>computes delay = next slot - current time
+   * <LI>schedules a out-of-balance audit task to be started after delay
+   * computed earlier
+   * <LI>once the audit task completes, it repeats delay computation and task
+   * submission
+   */
+  private class CronScheduler implements AuditScheduler {
+    final ScheduledExecutorService trigger;
+    CronSequenceGenerator generator;
+
+    CronScheduler() {
+      trigger = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+        @Override
+        public Thread newThread(Runnable r) {
+          Thread thread = new Thread(r, "AutoBalancer");
+          thread.setDaemon(true);
+          return thread;
+        }
+      });
+    }
+
+    @Override
+    public void init(String schedule) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Initiazing " + this.getClass().getSimpleName() + " with " + schedule);
+      }
+
+      if (schedule == null || schedule.isEmpty()) {
+        throw new GemFireConfigException("Missing configuration: " + SCHEDULE);
+      }
+      if (!CronExpression.isValidExpression(schedule)) {
+        throw new GemFireConfigException("Invalid schedule: " + schedule);
+      }
+      generator = new CronSequenceGenerator(schedule);
+
+      submitNext();
+    }
+
+    private void submitNext() {
+      long currentTime = clock.currentTimeMillis();
+      Date nextSchedule = generator.next(new Date(currentTime));
+      long delay = nextSchedule.getTime() - currentTime;
+
+      if (logger.isDebugEnabled()) {
+        logger.debug("Now={}, next audit time={}, delay={} ms", new Date(currentTime), nextSchedule,
delay);
+      }
+
+      trigger.schedule(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            auditor.execute();
+          } catch (Exception e) {
+            logger.warn("Error while executing out-of-balance audit.", e);
+          }
+          submitNext();
+        }
+      }, delay, TimeUnit.MILLISECONDS);
+    }
+  }
+
+  /**
+   * Queries member statistics and health to determine if a re-balance operation
+   * is needed
+   * <OL>
+   * <LI>acquires distributed lock
+   * <LI>queries member health
+   * <LI>updates auto-balance stat
+   * <LI>release lock
+   */
+  class SizeBasedOOBAuditor implements OOBAuditor {
+    @Override
+    public void init(Properties props) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Initiazing " + this.getClass().getSimpleName());
+      }
+    }
+
+    @Override
+    public void execute() {
+      cacheFacade.incrementAttemptCounter();
+      boolean result = cacheFacade.acquireAutoBalanceLock();
+      if (!result) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Another member owns auto-balance lock. Skip this attempt to rebalance
the cluster");
+        }
+        return;
+      }
+      try {
+        cacheFacade.rebalance();
+      } finally {
+        cacheFacade.releaseAutoBalanceLock();
+      }
+    }
+  }
+
+  /**
+   * Hides cache level details and exposes simple methods relevant for
+   * auto-balancing
+   */
+  static class GeodeCacheFacade implements CacheOperationFacade {
+    @Override
+    public void incrementAttemptCounter() {
+      GemFireCacheImpl cache = getCache();
+      try {
+        cache.getResourceManager().getStats().incAutoRebalanceAttempts();
+      } catch (Exception e) {
+        logger.warn("Failed ot increment AutoBalanceAttempts counter");
+      }
+    }
+
+    @Override
+    public void rebalance() {
+      RebalanceOperation operation = getCache().getResourceManager().createRebalanceFactory().simulate();
+      try {
+        RebalanceResults result = operation.getResults();
+        logger.info(result.getTotalBucketTransfersCompleted() + " " + result.getTotalBucketTransferBytes());
+      } catch (CancellationException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      } catch (InterruptedException e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+    private GemFireCacheImpl getCache() {
+      GemFireCacheImpl cache = GemFireCacheImpl.getInstance();
+      if (cache == null) {
+        throw new IllegalStateException("Missing cache instance.");
+      }
+      return cache;
+    }
+
+    public boolean acquireAutoBalanceLock() {
+      DistributedLockService dls = getDLS();
+
+      boolean result = dls.lock(AUTO_BALANCER_LOCK, 0, -1);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Grabed AutoBalancer lock? " + result);
+      }
+      return result;
+    }
+
+    public void releaseAutoBalanceLock() {
+      DistributedLockService dls = getDLS();
+      dls.unlock(AUTO_BALANCER_LOCK);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Successfully release auto-balance ownership");
+      }
+    }
+
+    @Override
+    public DistributedLockService getDLS() {
+      GemFireCacheImpl cache = getCache();
+      DistributedLockService dls = DistributedLockService.getServiceNamed(AUTO_BALANCER_LOCK_SERVICE_NAME);
+      if (dls == null) {
+        if (logger.isDebugEnabled()) {
+          logger.debug("Creating DistributeLockService");
+        }
+        dls = DLockService.create(AUTO_BALANCER_LOCK_SERVICE_NAME, cache.getDistributedSystem(),
true, true, true);
+      }
+
+      return dls;
+    }
+  }
+
+  private class SystemClockTimeProvider implements TimeProvider {
+    @Override
+    public long currentTimeMillis() {
+      return System.currentTimeMillis();
+    }
+  }
+
+  interface AuditScheduler {
+    void init(String schedule);
+  }
+
+  interface OOBAuditor {
+    void init(Properties props);
+
+    void execute();
+  }
+
+  interface TimeProvider {
+    long currentTimeMillis();
+  }
+
+  interface CacheOperationFacade {
+    boolean acquireAutoBalanceLock();
+
+    void releaseAutoBalanceLock();
+
+    DistributedLockService getDLS();
+
+    void rebalance();
+
+    void incrementAttemptCounter();
+  }
+
+  /**
+   * Test hook to inject custom triggers
+   */
+  void setScheduler(AuditScheduler trigger) {
+    logger.info("Setting custom AuditScheduler");
+    this.scheduler = trigger;
+  }
+
+  /**
+   * Test hook to inject custom auditors
+   */
+  void setOOBAuditor(OOBAuditor auditor) {
+    logger.info("Setting custom Auditor");
+    this.auditor = auditor;
+  }
+
+  OOBAuditor getOOBAuditor() {
+    return auditor;
+  }
+
+  /**
+   * Test hook to inject a clock
+   */
+  void setTimeProvider(TimeProvider clock) {
+    logger.info("Setting custom TimeProvider");
+    this.clock = clock;
+  }
+
+  /**
+   * Test hook to inject a Cache operation facade
+   */
+  public void setCacheOperationFacade(CacheOperationFacade facade) {
+    this.cacheFacade = facade;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8abba204/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
----------------------------------------------------------------------
diff --git a/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
new file mode 100644
index 0000000..76d7e74
--- /dev/null
+++ b/gemfire-rebalancer/src/test/java/com/gemstone/gemfire/cache/util/AutoBalancerJUnitTest.java
@@ -0,0 +1,421 @@
+package com.gemstone.gemfire.cache.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jmock.Expectations;
+import org.jmock.Mockery;
+import org.jmock.Sequence;
+import org.jmock.api.Invocation;
+import org.jmock.lib.action.CustomAction;
+import org.jmock.lib.concurrent.Synchroniser;
+import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.gemstone.gemfire.GemFireConfigException;
+import com.gemstone.gemfire.cache.CacheFactory;
+import com.gemstone.gemfire.cache.util.AutoBalancer.AuditScheduler;
+import com.gemstone.gemfire.cache.util.AutoBalancer.CacheOperationFacade;
+import com.gemstone.gemfire.cache.util.AutoBalancer.GeodeCacheFacade;
+import com.gemstone.gemfire.cache.util.AutoBalancer.OOBAuditor;
+import com.gemstone.gemfire.cache.util.AutoBalancer.TimeProvider;
+import com.gemstone.gemfire.distributed.DistributedLockService;
+import com.gemstone.gemfire.distributed.internal.locks.DLockService;
+import com.gemstone.gemfire.internal.cache.GemFireCacheImpl;
+import com.gemstone.gemfire.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class AutoBalancerJUnitTest {
+
+  // OOB > threshold
+  // OOB < threshold
+  // OOB > threshold && size < min
+  // OOB > threshold && size < min
+  // OOB critical nodes
+  GemFireCacheImpl cache;
+  Mockery mockContext;
+
+  @Before
+  public void setupMock() {
+    mockContext = new Mockery() {
+      {
+        setImposteriser(ClassImposteriser.INSTANCE);
+        setThreadingPolicy(new Synchroniser());
+      }
+    };
+  }
+
+  @After
+  public void destroyCacheAndDLS() {
+    if (DLockService.getServiceNamed(AutoBalancer.AUTO_BALANCER_LOCK_SERVICE_NAME) != null)
{
+      DLockService.destroy(AutoBalancer.AUTO_BALANCER_LOCK_SERVICE_NAME);
+    }
+
+    if (cache != null && !cache.isClosed()) {
+      cache.close();
+      cache = null;
+    }
+  }
+
+  @After
+  public void validateMock() {
+    mockContext.assertIsSatisfied();
+    mockContext = null;
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void testNoCacheError() {
+    AutoBalancer balancer = new AutoBalancer();
+    OOBAuditor auditor = balancer.getOOBAuditor();
+    auditor.execute();
+  }
+
+  @Test
+  public void testAutoRebalaceStatsOnLockSuccess() throws InterruptedException {
+    cache = createBasicCache();
+
+    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockCacheFacade).acquireAutoBalanceLock();
+        will(returnValue(true));
+        oneOf(mockCacheFacade).incrementAttemptCounter();
+        will(new CustomAction("increment stat") {
+          public Object invoke(Invocation invocation) throws Throwable {
+            new GeodeCacheFacade().incrementAttemptCounter();
+            return null;
+          }
+        });
+        allowing(mockCacheFacade);
+      }
+    });
+
+    assertEquals(0, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.setCacheOperationFacade(mockCacheFacade);
+    balancer.getOOBAuditor().execute();
+    assertEquals(1, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
+  }
+
+  @Test
+  public void testAutoRebalaceStatsOnLockFailure() throws InterruptedException {
+    cache = createBasicCache();
+
+    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockCacheFacade).acquireAutoBalanceLock();
+        will(returnValue(false));
+        oneOf(mockCacheFacade).incrementAttemptCounter();
+        will(new CustomAction("increment stat") {
+          public Object invoke(Invocation invocation) throws Throwable {
+            new GeodeCacheFacade().incrementAttemptCounter();
+            return null;
+          }
+        });
+        allowing(mockCacheFacade);
+      }
+    });
+
+    assertEquals(0, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.setCacheOperationFacade(mockCacheFacade);
+    balancer.getOOBAuditor().execute();
+    assertEquals(1, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
+  }
+
+  @Test
+  public void testAutoBalanceStatUpdate() {
+    cache = createBasicCache();
+    assertEquals(0, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
+    new GeodeCacheFacade().incrementAttemptCounter();
+    assertEquals(1, cache.getResourceManager().getStats().getAutoRebalanceAttempts());
+  }
+
+  @Test
+  public void testLockSuccess() throws InterruptedException {
+    cache = createBasicCache();
+
+    final DistributedLockService mockDLS = mockContext.mock(DistributedLockService.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockDLS).lock(AutoBalancer.AUTO_BALANCER_LOCK, 0L, -1L);
+        will(new CustomAction("acquire lock") {
+          @Override
+          public Object invoke(Invocation invocation) throws Throwable {
+            DistributedLockService dls = new GeodeCacheFacade().getDLS();
+            return dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0L, -1L);
+          }
+        });
+      }
+    });
+
+    final AtomicBoolean success = new AtomicBoolean(false);
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        CacheOperationFacade cacheFacade = new GeodeCacheFacade() {
+          public DistributedLockService getDLS() {
+            return mockDLS;
+          };
+        };
+        success.set(cacheFacade.acquireAutoBalanceLock());
+      }
+    });
+    thread.start();
+    thread.join();
+    assertTrue(success.get());
+  }
+
+  @Test
+  public void testLockAlreadyTakenElsewhere() throws InterruptedException {
+    cache = createBasicCache();
+
+    DistributedLockService dls = new GeodeCacheFacade().getDLS();
+    assertTrue(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1));
+
+    final DistributedLockService mockDLS = mockContext.mock(DistributedLockService.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockDLS).lock(AutoBalancer.AUTO_BALANCER_LOCK, 0L, -1L);
+        will(new CustomAction("acquire lock") {
+          @Override
+          public Object invoke(Invocation invocation) throws Throwable {
+            DistributedLockService dls = new GeodeCacheFacade().getDLS();
+            return dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0L, -1L);
+          }
+        });
+      }
+    });
+
+    final AtomicBoolean success = new AtomicBoolean(true);
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        CacheOperationFacade cacheFacade = new GeodeCacheFacade() {
+          public DistributedLockService getDLS() {
+            return mockDLS;
+          }
+        };
+        success.set(cacheFacade.acquireAutoBalanceLock());
+      }
+    });
+    thread.start();
+    thread.join();
+    assertFalse(success.get());
+  }
+
+  @Test
+  public void testReleaseLock() throws InterruptedException {
+    cache = createBasicCache();
+
+    final AtomicBoolean success = new AtomicBoolean(false);
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        DistributedLockService dls = new GeodeCacheFacade().getDLS();
+        success.set(dls.lock(AutoBalancer.AUTO_BALANCER_LOCK, 0, -1));
+      }
+    });
+    thread.start();
+    thread.join();
+
+    final DistributedLockService mockDLS = mockContext.mock(DistributedLockService.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockDLS).unlock(AutoBalancer.AUTO_BALANCER_LOCK);
+        will(new CustomAction("release lock") {
+          @Override
+          public Object invoke(Invocation invocation) throws Throwable {
+            DistributedLockService dls = new GeodeCacheFacade().getDLS();
+            dls.unlock(AutoBalancer.AUTO_BALANCER_LOCK);
+            return null;
+          }
+        });
+      }
+    });
+
+    success.set(true);
+    thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        CacheOperationFacade cacheFacade = new GeodeCacheFacade() {
+          public DistributedLockService getDLS() {
+            return mockDLS;
+          }
+        };
+        try {
+          cacheFacade.releaseAutoBalanceLock();
+        } catch (Exception e) {
+          success.set(false);
+        }
+      }
+    });
+    thread.start();
+    thread.join();
+    assertTrue(success.get());
+  }
+
+  @Test
+  public void testLockSequence() throws InterruptedException {
+    cache = createBasicCache();
+
+    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
+    final Sequence lockingSequence = mockContext.sequence("lockingSequence");
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockCacheFacade).acquireAutoBalanceLock();
+        inSequence(lockingSequence);
+        will(returnValue(true));
+        oneOf(mockCacheFacade).releaseAutoBalanceLock();
+        inSequence(lockingSequence);
+        allowing(mockCacheFacade);
+      }
+    });
+
+    AutoBalancer balancer = new AutoBalancer();
+    balancer.setCacheOperationFacade(mockCacheFacade);
+    balancer.getOOBAuditor().execute();
+  }
+
+  @Test
+  public void testFailExecuteIfLockedElsewhere() throws InterruptedException {
+    cache = createBasicCache();
+
+    final CacheOperationFacade mockCacheFacade = mockContext.mock(CacheOperationFacade.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockCacheFacade).acquireAutoBalanceLock();
+        will(returnValue(false));
+        oneOf(mockCacheFacade).incrementAttemptCounter();
+        // no other methods, rebalance, will be called
+      }
+    });
+
+    Thread thread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        AutoBalancer balancer = new AutoBalancer();
+        balancer.setCacheOperationFacade(mockCacheFacade);
+        balancer.getOOBAuditor().execute();
+      }
+    });
+    thread.start();
+    thread.join();
+  }
+
+  @Test
+  public void testInitializerCacheXML() {
+    String configStr = "<cache xmlns=\"http://schema.pivotal.io/gemfire/cache\"      
                   "
+        + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"                     
                "
+        + " xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\""
+        + " version=\"9.0\">                                                         
                   "
+        + "   <initializer>                                                       
                      "
+        + "     <class-name>com.gemstone.gemfire.cache.util.AutoBalancer</class-name>
                   "
+        + "     <parameter name=\"schedule\">                                     
                      "
+        + "       <string>* * * * * ? </string>                             
                            "
+        + "     </parameter>                                                      
                      "
+        + "   </initializer>                                                      
                      "
+        + " </cache>";
+
+    cache = createBasicCache();
+    cache.loadCacheXml(new ByteArrayInputStream(configStr.getBytes()));
+  }
+
+  private GemFireCacheImpl createBasicCache() {
+    return (GemFireCacheImpl) new CacheFactory().set("mcast-port", "0").create();
+  }
+
+  @Test(expected = GemFireConfigException.class)
+  public void testInitFailOnMissingScheduleConf() {
+    String configStr = "<cache xmlns=\"http://schema.pivotal.io/gemfire/cache\"      
                   "
+        + " xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"                     
                "
+        + " xsi:schemaLocation=\"http://schema.pivotal.io/gemfire/cache http://schema.pivotal.io/gemfire/cache/cache-9.0.xsd\""
+        + " version=\"9.0\">                                                         
                   "
+        + "   <initializer>                                                       
                      "
+        + "     <class-name>com.gemstone.gemfire.cache.util.AutoBalancer</class-name>
                   "
+        + "   </initializer>                                                      
                      "
+        + " </cache>";
+
+    cache = createBasicCache();
+    cache.loadCacheXml(new ByteArrayInputStream(configStr.getBytes()));
+  }
+
+  @Test
+  public void testAuditorInvocation() throws InterruptedException {
+    int count = 0;
+
+    final OOBAuditor mockAuditor = mockContext.mock(OOBAuditor.class);
+    final TimeProvider mockClock = mockContext.mock(TimeProvider.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockAuditor).init(with(any(Properties.class)));
+        exactly(2).of(mockAuditor).execute();
+        allowing(mockClock).currentTimeMillis();
+        will(returnValue(950L));
+      }
+    });
+
+    // every second schedule
+    String someSchedule = "* * * * * ?";
+    Properties props = new Properties();
+    props.put(AutoBalancer.SCHEDULE, someSchedule);
+
+    assertEquals(0, count);
+    AutoBalancer autoR = new AutoBalancer();
+    autoR.setOOBAuditor(mockAuditor);
+    autoR.setTimeProvider(mockClock);
+
+    // the trigger should get invoked after 50 milliseconds
+    autoR.init(props);
+
+    TimeUnit.MILLISECONDS.sleep(120);
+  }
+
+  @Test(expected = GemFireConfigException.class)
+  public void testInvalidSchedule() {
+    String someSchedule = "X Y * * * *";
+    Properties props = new Properties();
+    props.put(AutoBalancer.SCHEDULE, someSchedule);
+
+    AutoBalancer autoR = new AutoBalancer();
+    autoR.init(props);
+  }
+
+  @Test
+  public void testAutoBalancerInit() {
+    final String someSchedule = "1 * * * 1 *";
+    final Properties props = new Properties();
+    props.put(AutoBalancer.SCHEDULE, someSchedule);
+
+    final AuditScheduler mockScheduler = mockContext.mock(AuditScheduler.class);
+    final OOBAuditor mockAuditor = mockContext.mock(OOBAuditor.class);
+    mockContext.checking(new Expectations() {
+      {
+        oneOf(mockScheduler).init(someSchedule);
+        oneOf(mockAuditor).init(props);
+      }
+    });
+
+    AutoBalancer autoR = new AutoBalancer();
+    autoR.setScheduler(mockScheduler);
+    autoR.setOOBAuditor(mockAuditor);
+
+    autoR.init(props);
+  }
+
+  @Test
+  public void testNullInitialization() {
+    AutoBalancer autoR = new AutoBalancer();
+    autoR.init(null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/8abba204/settings.gradle
----------------------------------------------------------------------
diff --git a/settings.gradle b/settings.gradle
index 7f6ed61..4b2da7b 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -8,6 +8,7 @@ include 'gemfire-core'
 include 'gemfire-web'
 include 'gemfire-web-api'
 include 'gemfire-assembly'
+include 'gemfire-rebalancer'
 
 def minimumGradleVersion = '2.3'
 if (GradleVersion.current() < GradleVersion.version(minimumGradleVersion)) {


Mime
View raw message