hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject svn commit: r1486258 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/master/balancer/ test/java/org/apache/hadoop/hbase/master/balancer/
Date Sat, 25 May 2013 00:22:59 GMT
Author: eclark
Date: Sat May 25 00:22:59 2013
New Revision: 1486258

URL: http://svn.apache.org/r1486258
Log:
HBASE-8517 Stochastic Loadbalancer isn't finding steady state on real clusters

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java?rev=1486258&r1=1486257&r2=1486258&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java Sat May 25 00:22:59 2013
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hbase.master.balancer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +60,7 @@ public abstract class BaseLoadBalancer i
    */
   protected static class Cluster {
     ServerName[] servers;
-    ArrayList<byte[]> tables;
+    ArrayList<String> tables;
     HRegionInfo[] regions;
     List<RegionLoad>[] regionLoads;
     int[][] regionLocations; //regionIndex -> list of serverIndex sorted by locality
@@ -70,8 +72,10 @@ public abstract class BaseLoadBalancer i
     int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
     int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
 
-    Map<ServerName, Integer> serversToIndex;
-    Map<Integer, Integer> tablesToIndex;
+    Integer[] serverIndicesSortedByRegionCount;
+
+    Map<String, Integer> serversToIndex;
+    Map<String, Integer> tablesToIndex;
 
     int numRegions;
     int numServers;
@@ -82,21 +86,35 @@ public abstract class BaseLoadBalancer i
 
     protected Cluster(Map<ServerName, List<HRegionInfo>> clusterState,  Map<String, List<RegionLoad>> loads,
         RegionLocationFinder regionFinder) {
-      serversToIndex = new HashMap<ServerName, Integer>(clusterState.size());
-      tablesToIndex = new HashMap<Integer, Integer>();
+
+      serversToIndex = new HashMap<String, Integer>();
+      tablesToIndex = new HashMap<String, Integer>();
       //regionsToIndex = new HashMap<HRegionInfo, Integer>();
 
       //TODO: We should get the list of tables from master
-      tables = new ArrayList<byte[]>();
+      tables = new ArrayList<String>();
+
 
-      numServers = clusterState.size();
       numRegions = 0;
 
+      int serverIndex = 0;
+
+      // Use servername and port as there can be dead servers in this list. We want everything with
+      // a matching hostname and port to have the same index.
+      for (ServerName sn:clusterState.keySet()) {
+        if (serversToIndex.get(sn.getHostAndPort()) == null) {
+          serversToIndex.put(sn.getHostAndPort(), serverIndex++);
+        }
+      }
+
+      // Count how many regions there are.
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
         numRegions += entry.getValue().size();
       }
 
-      regionsPerServer = new int[clusterState.size()][];
+      numServers = serversToIndex.size();
+      regionsPerServer = new int[serversToIndex.size()][];
+
       servers = new ServerName[numServers];
       regions = new HRegionInfo[numRegions];
       regionIndexToServerIndex = new int[numRegions];
@@ -104,26 +122,35 @@ public abstract class BaseLoadBalancer i
       regionIndexToTableIndex = new int[numRegions];
       regionLoads = new List[numRegions];
       regionLocations = new int[numRegions][];
+      serverIndicesSortedByRegionCount = new Integer[numServers];
+
+      int tableIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
 
-      int tableIndex = 0, serverIndex = 0, regionIndex = 0, regionPerServerIndex = 0;
-      // populate serversToIndex first
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
-        servers[serverIndex] = entry.getKey();
+        serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
+
+        // keep the servername if this is the first server name for this hostname
+        // or this servername has the newest startcode.
+        if (servers[serverIndex] == null ||
+            servers[serverIndex].getStartcode() < entry.getKey().getStartcode()) {
+          servers[serverIndex] = entry.getKey();
+        }
+
         regionsPerServer[serverIndex] = new int[entry.getValue().size()];
-        serversToIndex.put(servers[serverIndex], Integer.valueOf(serverIndex));
-        serverIndex++;
+        serverIndicesSortedByRegionCount[serverIndex] = serverIndex;
       }
-      serverIndex = 0;
+
       for (Entry<ServerName, List<HRegionInfo>> entry : clusterState.entrySet()) {
+        serverIndex = serversToIndex.get(entry.getKey().getHostAndPort());
         regionPerServerIndex = 0;
+
         for (HRegionInfo region : entry.getValue()) {
-          byte[] tableName = region.getTableName();
-          int tableHash = Bytes.mapKey(tableName);
-          Integer idx = tablesToIndex.get(tableHash);
+          String tableName = region.getTableNameAsString();
+          Integer idx = tablesToIndex.get(tableName);
           if (idx == null) {
             tables.add(tableName);
             idx = tableIndex;
-            tablesToIndex.put(tableHash, tableIndex++);
+            tablesToIndex.put(tableName, tableIndex++);
           }
 
           regions[regionIndex] = region;
@@ -132,7 +159,7 @@ public abstract class BaseLoadBalancer i
           regionIndexToTableIndex[regionIndex] = idx;
           regionsPerServer[serverIndex][regionPerServerIndex++] = regionIndex;
 
-          //region load
+          // region load
           if (loads != null) {
             List<RegionLoad> rl = loads.get(region.getRegionNameAsString());
             // That could have failed if the RegionLoad is using the other regionName
@@ -156,7 +183,6 @@ public abstract class BaseLoadBalancer i
 
           regionIndex++;
         }
-        serverIndex++;
       }
 
       numTables = tables.size();
@@ -263,6 +289,53 @@ public abstract class BaseLoadBalancer i
       }
       return regions;
     }
+
+    void sortServersByRegionCount() {
+      Arrays.sort(serverIndicesSortedByRegionCount, numRegionsComparator);
+    }
+
+    int getNumRegions(int server) {
+      return regionsPerServer[server].length;
+    }
+
+    private Comparator<Integer> numRegionsComparator = new Comparator<Integer>() {
+      @Override
+      public int compare(Integer integer, Integer integer2) {
+        return Integer.valueOf(getNumRegions(integer)).compareTo(getNumRegions(integer2));
+      }
+    };
+
+    @Override
+    public String toString() {
+      String desc = "Cluster{" +
+          "servers=[";
+          for(ServerName sn:servers) {
+             desc += sn.getHostAndPort() + ", ";
+          }
+          desc +=
+          ", serverIndicesSortedByRegionCount="+
+          Arrays.toString(serverIndicesSortedByRegionCount) +
+          ", regionsPerServer=[";
+
+          for (int[]r:regionsPerServer) {
+            desc += Arrays.toString(r);
+          }
+          desc += "]" +
+          ", numMaxRegionsPerTable=" +
+          Arrays.toString(numMaxRegionsPerTable) +
+          ", numRegions=" +
+          numRegions +
+          ", numServers=" +
+          numServers +
+          ", numTables=" +
+          numTables +
+          ", numMovedRegions=" +
+          numMovedRegions +
+          ", numMovedMetaRegions=" +
+          numMovedMetaRegions +
+          '}';
+      return desc;
+    }
   }
 
   // slop for regions
@@ -270,7 +343,6 @@ public abstract class BaseLoadBalancer i
   private Configuration config;
   private static final Random RANDOM = new Random(System.currentTimeMillis());
   private static final Log LOG = LogFactory.getLog(BaseLoadBalancer.class);
-
   protected MasterServices services;
 
   @Override

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java?rev=1486258&r1=1486257&r2=1486258&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java Sat May 25 00:22:59 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.master.Ma
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@@ -86,48 +87,35 @@ import org.apache.hadoop.hbase.util.Envi
 @InterfaceAudience.Private
 public class StochasticLoadBalancer extends BaseLoadBalancer {
 
-  private static final String STOREFILE_SIZE_COST_KEY =
-      "hbase.master.balancer.stochastic.storefileSizeCost";
-  private static final String MEMSTORE_SIZE_COST_KEY =
-      "hbase.master.balancer.stochastic.memstoreSizeCost";
-  private static final String WRITE_REQUEST_COST_KEY =
-      "hbase.master.balancer.stochastic.writeRequestCost";
-  private static final String READ_REQUEST_COST_KEY =
-      "hbase.master.balancer.stochastic.readRequestCost";
-  private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
-  private static final String TABLE_LOAD_COST_KEY =
-      "hbase.master.balancer.stochastic.tableLoadCost";
-  private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
-  private static final String REGION_LOAD_COST_KEY =
-      "hbase.master.balancer.stochastic.regionLoadCost";
   private static final String STEPS_PER_REGION_KEY =
       "hbase.master.balancer.stochastic.stepsPerRegion";
-  private static final String MAX_STEPS_KEY = "hbase.master.balancer.stochastic.maxSteps";
-  private static final String MAX_MOVES_KEY = "hbase.master.balancer.stochastic.maxMoveRegions";
-  private static final String MAX_RUNNING_TIME_KEY = "hbase.master.balancer.stochastic.maxRunningTime";
-  private static final String KEEP_REGION_LOADS = "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
+  private static final String MAX_STEPS_KEY =
+      "hbase.master.balancer.stochastic.maxSteps";
+  private static final String MAX_RUNNING_TIME_KEY =
+      "hbase.master.balancer.stochastic.maxRunningTime";
+  private static final String KEEP_REGION_LOADS =
+      "hbase.master.balancer.stochastic.numRegionLoadsToRemember";
 
   private static final Random RANDOM = new Random(System.currentTimeMillis());
   private static final Log LOG = LogFactory.getLog(StochasticLoadBalancer.class);
+
   private final RegionLocationFinder regionFinder = new RegionLocationFinder();
   private ClusterStatus clusterStatus = null;
   private Map<String, List<RegionLoad>> loads = new HashMap<String, List<RegionLoad>>();
 
   // values are defaults
-  private int maxSteps = 15000;
-  private int stepsPerRegion = 110;
-  private long maxRunningTime = 60 * 1000; //1 min
-  private int maxMoves = 600;
+  private int maxSteps = 1000000;
+  private int stepsPerRegion = 800;
+  private long maxRunningTime = 60 * 1000 * 1; // 1 min
   private int numRegionLoadsToRemember = 15;
-  private float loadMultiplier = 100;
-  private float moveCostMultiplier = 1;
-  private float tableMultiplier = 5;
-  private float localityMultiplier = 5;
-  private float readRequestMultiplier = 0;
-  private float writeRequestMultiplier = 0;
-  private float memStoreSizeMultiplier = 5;
-  private float storeFileSizeMultiplier = 5;
 
+  private RegionPicker[] pickers;
+  private CostFromRegionLoadFunction[] regionLoadFunctions;
+  private CostFunction[] costFunctions;
+  // Keep locality based picker and cost function to alert them
+  // when new services are offered
+  private LocalityBasedPicker localityPicker;
+  private LocalityCostFunction localityCost;
 
   @Override
   public void setConf(Configuration conf) {
@@ -135,27 +123,38 @@ public class StochasticLoadBalancer exte
     regionFinder.setConf(conf);
 
     maxSteps = conf.getInt(MAX_STEPS_KEY, maxSteps);
-    maxMoves = conf.getInt(MAX_MOVES_KEY, maxMoves);
+
     stepsPerRegion = conf.getInt(STEPS_PER_REGION_KEY, stepsPerRegion);
     maxRunningTime = conf.getLong(MAX_RUNNING_TIME_KEY, maxRunningTime);
 
     numRegionLoadsToRemember = conf.getInt(KEEP_REGION_LOADS, numRegionLoadsToRemember);
 
-    // Load multiplier should be the greatest as it is the most general way to balance data.
-    loadMultiplier = conf.getFloat(REGION_LOAD_COST_KEY, loadMultiplier);
+    localityPicker = new LocalityBasedPicker(services);
+    localityCost = new LocalityCostFunction(conf, services);
 
-    // Move cost multiplier should be the same cost or higer than the rest of the costs to ensure
-    // that two costs must get better to justify a move cost.
-    moveCostMultiplier = conf.getFloat(MOVE_COST_KEY, moveCostMultiplier);
-
-    // These are the added costs so that the stochastic load balancer can get a little bit smarter
-    // about where to move regions.
-    tableMultiplier = conf.getFloat(TABLE_LOAD_COST_KEY, tableMultiplier);
-    localityMultiplier = conf.getFloat(LOCALITY_COST_KEY, localityMultiplier);
-    memStoreSizeMultiplier = conf.getFloat(MEMSTORE_SIZE_COST_KEY, memStoreSizeMultiplier);
-    storeFileSizeMultiplier = conf.getFloat(STOREFILE_SIZE_COST_KEY, storeFileSizeMultiplier);
-    readRequestMultiplier = conf.getFloat(READ_REQUEST_COST_KEY, readRequestMultiplier);
-    writeRequestMultiplier = conf.getFloat(WRITE_REQUEST_COST_KEY, writeRequestMultiplier);
+    pickers = new RegionPicker[] {
+      new RandomRegionPicker(),
+      new LoadPicker(),
+      //localityPicker
+    };
+
+    regionLoadFunctions = new CostFromRegionLoadFunction[] {
+      new ReadRequestCostFunction(conf),
+      new WriteRequestCostFunction(conf),
+      new MemstoreSizeCostFunction(conf),
+      new StoreFileCostFunction(conf)
+    };
+
+    costFunctions = new CostFunction[]{
+      new RegionCountSkewCostFunction(conf),
+      new MoveCostFunction(conf),
+      localityCost,
+      new TableSkewCostFunction(conf),
+      regionLoadFunctions[0],
+      regionLoadFunctions[1],
+      regionLoadFunctions[2],
+      regionLoadFunctions[3],
+    };
   }
 
   @Override
@@ -164,13 +163,18 @@ public class StochasticLoadBalancer exte
     regionFinder.setClusterStatus(st);
     this.clusterStatus = st;
     updateRegionLoad();
+    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
+      cost.setClusterStatus(st);
+    }
   }
 
   @Override
   public void setMasterServices(MasterServices masterServices) {
     super.setMasterServices(masterServices);
-    this.services = masterServices;
     this.regionFinder.setServices(masterServices);
+    this.localityCost.setServices(masterServices);
+    this.localityPicker.setServices(masterServices);
+
   }
 
   /**
@@ -179,78 +183,84 @@ public class StochasticLoadBalancer exte
    */
   @Override
   public List<RegionPlan> balanceCluster(Map<ServerName, List<HRegionInfo>> clusterState) {
-    
-    if (!needsBalance(new ClusterLoadState(clusterState))) {
-      return null;
-    }
+    //if (!needsBalance(new ClusterLoadState(clusterState))) {
+    //  return null;
+    //}
 
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
 
     // Keep track of servers to iterate through them.
-    double currentCost, newCost, initCost;
-
     Cluster cluster = new Cluster(clusterState, loads, regionFinder);
-    currentCost = newCost = initCost = computeCost(cluster);
+    double currentCost = computeCost(cluster, Double.MAX_VALUE);
 
-    int computedMaxSteps =
-        Math.min(this.maxSteps, (cluster.numRegions * this.stepsPerRegion));
+    double initCost = currentCost;
+    double newCost = currentCost;
+
+    long computedMaxSteps = Math.min(this.maxSteps,
+        ((long)cluster.numRegions * (long)this.stepsPerRegion * (long)cluster.numServers));
     // Perform a stochastic walk to see if we can get a good fit.
-    int step;
+    long step;
     for (step = 0; step < computedMaxSteps; step++) {
+      int pickerIdx = RANDOM.nextInt(pickers.length);
+      RegionPicker p = pickers[pickerIdx];
+      Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> picks = p.pick(cluster);
+
+      int leftServer = picks.getFirst().getFirst();
+      int leftRegion = picks.getFirst().getSecond();
+      int rightServer = picks.getSecond().getFirst();
+      int rightRegion = picks.getSecond().getSecond();
 
-      // try and perform a mutation
-      for (int leftServer = 0; leftServer < cluster.numServers; leftServer++) {
-
-        // What server are we going to be swapping regions with ?
-        int rightServer = pickOtherServer(leftServer, cluster);
-        if (rightServer < 0) {
-          continue;
-        }
-
-        // Pick what regions to swap around.
-        // If we get a null for one then this isn't a swap just a move
-        int lRegion = pickRandomRegion(cluster, leftServer, 0);
-        int rRegion = pickRandomRegion(cluster, rightServer, 0.5);
-
-        // We randomly picked to do nothing.
-        if (lRegion < 0 && rRegion < 0) {
-          continue;
-        }
+      // We couldn't find a server
+      if (rightServer < 0 || leftServer < 0) {
+        continue;
+      }
 
-        cluster.moveOrSwapRegion(leftServer, rightServer, lRegion, rRegion);
+      // We randomly picked to do nothing.
+      if (leftRegion < 0 && rightRegion < 0) {
+        continue;
+      }
 
-        newCost = computeCost(cluster);
-        // Should this be kept?
-        if (newCost < currentCost) {
-          currentCost = newCost;
-        } else {
-          // Put things back the way they were before.
-          //TODO: undo by remembering old values, using an UndoAction class
-          cluster.moveOrSwapRegion(leftServer, rightServer, rRegion, lRegion);
-        }
+      cluster.moveOrSwapRegion(leftServer,
+          rightServer,
+          leftRegion,
+          rightRegion);
+
+      newCost = computeCost(cluster, currentCost);
+      // Should this be kept?
+      if (newCost < currentCost) {
+        currentCost = newCost;
+      } else {
+        // Put things back the way they were before.
+        // TODO: undo by remembering old values, using an UndoAction class
+        cluster.moveOrSwapRegion(leftServer,
+            rightServer,
+            rightRegion,
+            leftRegion);
       }
-      if (EnvironmentEdgeManager.currentTimeMillis() - startTime > maxRunningTime) {
+
+      if (EnvironmentEdgeManager.currentTimeMillis() - startTime >
+          maxRunningTime) {
         break;
       }
     }
 
     long endTime = EnvironmentEdgeManager.currentTimeMillis();
 
+
     if (initCost > currentCost) {
       List<RegionPlan> plans = createRegionPlans(cluster);
-
       if (LOG.isDebugEnabled()) {
         LOG.debug("Finished computing new load balance plan.  Computation took "
             + (endTime - startTime) + "ms to try " + step
-            + " different iterations.  Found a solution that moves " + plans.size()
-            + " regions; Going from a computed cost of " + initCost + " to a new cost of "
-            + currentCost);
+            + " different iterations.  Found a solution that moves "
+            + plans.size() + " regions; Going from a computed cost of "
+            + initCost + " to a new cost of " + currentCost);
       }
       return plans;
     }
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Could not find a better load balance plan.  Tried " + step
-          + " different configurations in " + (endTime - startTime)
+      LOG.debug("Could not find a better load balance plan.  Tried "
+          + step + " different configurations in " + (endTime - startTime)
           + "ms, and did not find anything with a computed cost less than " + initCost);
     }
     return null;
@@ -265,14 +275,16 @@ public class StochasticLoadBalancer exte
    */
   private List<RegionPlan> createRegionPlans(Cluster cluster) {
     List<RegionPlan> plans = new LinkedList<RegionPlan>();
-
-    for (int regionIndex = 0; regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
+    for (int regionIndex = 0;
+         regionIndex < cluster.regionIndexToServerIndex.length; regionIndex++) {
       int initialServerIndex = cluster.initialRegionIndexToServerIndex[regionIndex];
       int newServerIndex = cluster.regionIndexToServerIndex[regionIndex];
+
       if (initialServerIndex != newServerIndex) {
         HRegionInfo region = cluster.regions[regionIndex];
         ServerName initialServer = cluster.servers[initialServerIndex];
         ServerName newServer = cluster.servers[newServerIndex];
+
         if (LOG.isTraceEnabled()) {
           LOG.trace("Moving Region " + region.getEncodedName() + " from server "
               + initialServer.getHostname() + " to " + newServer.getHostname());
@@ -284,30 +296,31 @@ public class StochasticLoadBalancer exte
     return plans;
   }
 
-  /** Store the current region loads. */
+  /**
+   * Store the current region loads.
+   */
   private synchronized void updateRegionLoad() {
-
-    //We create a new hashmap so that regions that are no longer there are removed.
-    //However we temporarily need the old loads so we can use them to keep the rolling average.
+    // We create a new hashmap so that regions that are no longer there are removed.
+    // However we temporarily need the old loads so we can use them to keep the rolling average.
     Map<String, List<RegionLoad>> oldLoads = loads;
     loads = new HashMap<String, List<RegionLoad>>();
 
     for (ServerName sn : clusterStatus.getServers()) {
       ServerLoad sl = clusterStatus.getLoad(sn);
-      if (sl == null) continue;
+      if (sl == null) {
+        continue;
+      }
       for (Entry<byte[], RegionLoad> entry : sl.getRegionsLoad().entrySet()) {
         List<RegionLoad> rLoads = oldLoads.get(Bytes.toString(entry.getKey()));
         if (rLoads != null) {
-
-          //We're only going to keep 15.  So if there are that many already take the last 14
+          // We're only going to keep 15.  So if there are that many already take the last 14
           if (rLoads.size() >= numRegionLoadsToRemember) {
-            int numToRemove = 1 +  (rLoads.size() - numRegionLoadsToRemember);
-
+            int numToRemove = 1 + (rLoads.size() - numRegionLoadsToRemember);
             rLoads = rLoads.subList(numToRemove, rLoads.size());
           }
 
         } else {
-          //There was nothing there
+          // There was nothing there
           rLoads = new ArrayList<RegionLoad>();
         }
         rLoads.add(entry.getValue());
@@ -315,322 +328,628 @@ public class StochasticLoadBalancer exte
 
       }
     }
+
+    for(CostFromRegionLoadFunction cost : regionLoadFunctions) {
+      cost.setLoads(loads);
+    }
   }
 
+
   /**
-   * From a list of regions pick a random one. Null can be returned which
-   * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
-   * rather than swap.
+   * This is the main cost function.  It will compute a cost associated with a proposed cluster
+   * state.  All different costs will be combined with their multipliers to produce a double cost.
    *
    * @param cluster The state of the cluster
-   * @param server index of the server
-   * @param chanceOfNoSwap Chance that this will decide to try a move rather
-   *                       than a swap.
-   * @return a random {@link HRegionInfo} or null if an asymmetrical move is
-   *         suggested.
+   * @param previousCost the previous cost. This is used as an early out.
+   * @return a double of a cost associated with the proposed cluster state.  This cost is an
+   *         aggregate of all individual cost functions.
    */
-  private int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
-    //Check to see if this is just a move.
-    if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
-      //signal a move only.
-      return -1;
+  protected double computeCost(Cluster cluster, double previousCost) {
+    double total = 0;
+
+    for (CostFunction c:costFunctions) {
+      if (c.getMultiplier() <= 0) {
+        continue;
+      }
+
+      total += c.getMultiplier() * c.cost(cluster);
+
+      if (total > previousCost) {
+        return total;
+      }
+    }
+    return total;
+  }
+
+  abstract static class RegionPicker {
+    abstract Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster);
+
+    /**
+     * From a list of regions pick a random one. Null can be returned which
+     * {@link StochasticLoadBalancer#balanceCluster(Map)} recognize as signal to try a region move
+     * rather than swap.
+     *
+     * @param cluster        The state of the cluster
+     * @param server         index of the server
+     * @param chanceOfNoSwap Chance that this will decide to try a move rather
+     *                       than a swap.
+     * @return a random {@link HRegionInfo} or null if an asymmetrical move is
+     *         suggested.
+     */
+    protected int pickRandomRegion(Cluster cluster, int server, double chanceOfNoSwap) {
+      // Check to see if this is just a move.
+      if (cluster.regionsPerServer[server].length == 0 || RANDOM.nextFloat() < chanceOfNoSwap) {
+        // signal a move only.
+        return -1;
+      }
+      int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
+      return cluster.regionsPerServer[server][rand];
+
+    }
+    protected int pickRandomServer(Cluster cluster) {
+      if (cluster.numServers < 1) {
+        return -1;
+      }
+
+      return RANDOM.nextInt(cluster.numServers);
+    }
+    protected int pickOtherRandomServer(Cluster cluster, int serverIndex) {
+      if (cluster.numServers < 2) {
+        return -1;
+      }
+      while (true) {
+        int otherServerIndex = pickRandomServer(cluster);
+        if (otherServerIndex != serverIndex) {
+          return otherServerIndex;
+        }
+      }
+    }
+
+    protected Pair<Integer, Integer> pickRandomRegions(Cluster cluster,
+                                                       int thisServer,
+                                                       int otherServer) {
+      if (thisServer < 0 || otherServer < 0) {
+        return new Pair<Integer, Integer>(-1, -1);
+      }
+
+      // Decide who is most likely to need another region
+      int thisRegionCount = cluster.getNumRegions(thisServer);
+      int otherRegionCount = cluster.getNumRegions(otherServer);
+
+      // Assign the chance based upon the above
+      double thisChance = (thisRegionCount > otherRegionCount) ? 0 : 0.5;
+      double otherChance = (thisRegionCount <= otherRegionCount) ? 0 : 0.5;
+
+      int thisRegion = pickRandomRegion(cluster, thisServer, thisChance);
+      int otherRegion = pickRandomRegion(cluster, otherServer, otherChance);
+
+      return new Pair<Integer, Integer>(thisRegion, otherRegion);
+    }
+  }
+
+  static class RandomRegionPicker extends RegionPicker {
+
+    @Override
+    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+
+      int thisServer = pickRandomServer(cluster);
+
+      // Pick the other server
+      int otherServer = pickOtherRandomServer(cluster, thisServer);
+
+      Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
+
+      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+          new Pair<Integer, Integer>(thisServer, regions.getFirst()),
+          new Pair<Integer, Integer>(otherServer, regions.getSecond())
+
+      );
     }
-    int rand = RANDOM.nextInt(cluster.regionsPerServer[server].length);
-    return cluster.regionsPerServer[server][rand];
 
   }
 
-  /**
-   * Given a server we will want to switch regions with another server. This
-   * function picks a random server from the list.
-   *
-   * @param serverIndex Current Server. This server will never be the return value.
-   * @param cluster The state of the cluster
-   * @return random server. Null if no other servers were found.
-   */
-  private int pickOtherServer(int serverIndex, Cluster cluster) {
-    if (cluster.numServers < 2) {
-      return -1;
-    }
-    while (true) {
-      int otherServerIndex = RANDOM.nextInt(cluster.numServers);
-      if (otherServerIndex != serverIndex) {
-        return otherServerIndex;
+  public static class LoadPicker extends RegionPicker {
+
+    @Override
+    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+      cluster.sortServersByRegionCount();
+      int thisServer = pickMostLoadedServer(cluster, -1);
+      int otherServer = pickLeastLoadedServer(cluster, thisServer);
+
+      Pair<Integer, Integer> regions = pickRandomRegions(cluster, thisServer, otherServer);
+      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+          new Pair<Integer, Integer>(thisServer, regions.getFirst()),
+          new Pair<Integer, Integer>(otherServer, regions.getSecond())
+
+      );
+    }
+
+    private int pickLeastLoadedServer(final Cluster cluster, int thisServer) {
+      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+
+      int index = 0;
+      while (servers[index] == null || servers[index] == thisServer) {
+        index++;
+        if (index == servers.length) {
+          return -1;
+        }
       }
+      return servers[index];
+    }
+
+    private int pickMostLoadedServer(final Cluster cluster, int thisServer) {
+      Integer[] servers = cluster.serverIndicesSortedByRegionCount;
+
+      int index = servers.length - 1;
+      while (servers[index] == null || servers[index] == thisServer) {
+        index--;
+        if (index < 0) {
+          return -1;
+        }
+      }
+      return servers[index];
+    }
+  }
+
+  static class LocalityBasedPicker extends RegionPicker {
+
+    private MasterServices masterServices;
+
+    LocalityBasedPicker(MasterServices masterServices) {
+      this.masterServices = masterServices;
+    }
+
+    @Override
+    Pair<Pair<Integer, Integer>, Pair<Integer, Integer>> pick(Cluster cluster) {
+      if (this.masterServices == null) {
+        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+            new Pair<Integer, Integer>(-1,-1),
+            new Pair<Integer, Integer>(-1,-1)
+        );
+      }
+      // Pick a random region server
+      int thisServer = pickRandomServer(cluster);
+
+      // Pick a random region on this server
+      int thisRegion = pickRandomRegion(cluster, thisServer, 0.0f);
+
+      if (thisRegion == -1) {
+        return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+            new Pair<Integer, Integer>(-1,-1),
+            new Pair<Integer, Integer>(-1,-1)
+        );
+      }
+
+      // Pick the server with the highest locality
+      int otherServer = pickHighestLocalityServer(cluster, thisServer, thisRegion);
+
+      // pick an region on the other server to potentially swap
+      int otherRegion = this.pickRandomRegion(cluster, otherServer, 0.5f);
+
+      return new Pair<Pair<Integer, Integer>, Pair<Integer, Integer>>(
+          new Pair<Integer, Integer>(thisServer,thisRegion),
+          new Pair<Integer, Integer>(otherServer,otherRegion)
+      );
+    }
+
+    private int pickHighestLocalityServer(Cluster cluster, int thisServer, int thisRegion) {
+      int[] regionLocations = cluster.regionLocations[thisRegion];
+
+      if (regionLocations == null || regionLocations.length <= 1) {
+        return pickOtherRandomServer(cluster, thisServer);
+      }
+
+      int idx = 0;
+
+      while (idx < regionLocations.length && regionLocations[idx] == thisServer) {
+        idx++;
+      }
+
+      return idx;
+    }
+
+    void setServices(MasterServices services) {
+      this.masterServices = services;
     }
   }
 
   /**
-   * This is the main cost function.  It will compute a cost associated with a proposed cluster
-   * state.  All different costs will be combined with their multipliers to produce a double cost.
-   *
-   * @param cluster The state of the cluster
-   * @return a double of a cost associated with the proposed
+   * Base class of StochasticLoadBalancer's Cost Functions.
    */
-  protected double computeCost(Cluster cluster) {
-    double moveCost = (moveCostMultiplier > 0) ?
-      (moveCostMultiplier * computeMoveCost(cluster)) :
-      0;
-
-    double regionCountSkewCost = (loadMultiplier > 0) ?
-      (loadMultiplier * computeSkewLoadCost(cluster)) :
-      0;
-
-    double tableSkewCost = (tableMultiplier > 0) ?
-      (tableMultiplier * computeTableSkewLoadCost(cluster)) :
-      0;
-
-    double localityCost = (localityMultiplier > 0) ?
-      (localityMultiplier * computeDataLocalityCost(cluster)) :
-      0;
-
-    double memstoreSizeCost =
-      (memStoreSizeMultiplier > 0) ?
-      (memStoreSizeMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.MEMSTORE_SIZE)) :
-      0;
-
-    double storefileSizeCost =
-      (storeFileSizeMultiplier > 0) ?
-      (storeFileSizeMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.STOREFILE_SIZE)):
-      0;
-
-    double readRequestCost =
-      (readRequestMultiplier > 0) ?
-      (readRequestMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.READ_REQUEST)) :
-      0;
-
-    double writeRequestCost =
-      (writeRequestMultiplier > 0) ?
-      (writeRequestMultiplier * computeRegionLoadCost(cluster, RegionLoadCostType.WRITE_REQUEST)) :
-      0;
-
-    double total =
-      moveCost + regionCountSkewCost + tableSkewCost + localityCost + memstoreSizeCost
-        + storefileSizeCost + readRequestCost + writeRequestCost;
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("Computed weights for a potential balancing total = " + total + " moveCost = "
-        + moveCost + " regionCountSkewCost = " + regionCountSkewCost + " tableSkewCost = "
-        + tableSkewCost + " localityCost = " + localityCost + " memstoreSizeCost = "
-        + memstoreSizeCost + " storefileSizeCost = " + storefileSizeCost);
+  public abstract static class CostFunction {
+
+    private float multiplier = 0;
+    private Configuration conf;
+
+    CostFunction(Configuration c) {
+      this.conf = c;
+    }
+
+    float getMultiplier() {
+      return multiplier;
+    }
+
+    void setMultiplier(float m) {
+      this.multiplier = m;
+    }
+
+    abstract double cost(Cluster cluster);
+
+    /**
+     * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
+     * assumes that this is a zero sum set of costs.  It assumes that the worst case
+     * possible is all of the elements in one region server and the rest having 0.
+     *
+     * @param stats the costs
+     * @return a scaled set of costs.
+     */
+    protected double costFromArray(double[] stats) {
+      double totalCost = 0;
+      double total = getSum(stats);
+      double mean = total/((double)stats.length);
+      double count = stats.length;
+
+      // Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
+      // a zero sum cost for this to make sense.
+      // TODO: Should we make this sum of square errors?
+      double max = ((count - 1) * mean) + (total - mean);
+      for (double n : stats) {
+        double diff = Math.abs(mean - n);
+        totalCost += diff;
+      }
+
+      double scaled =  scale(0, max, totalCost);
+      return scaled;
+    }
+
+
+
+    private double getSum(double[] stats) {
+      double total = 0;
+      for(double s:stats) {
+        total += s;
+      }
+      return total;
+    }
+
+    /**
+     * Scale the value between 0 and 1.
+     *
+     * @param min   Min value
+     * @param max   The Max value
+     * @param value The value to be scaled.
+     * @return The scaled value.
+     */
+    protected double scale(double min, double max, double value) {
+      if (max == 0 || value == 0) {
+        return 0;
+      }
+
+      return Math.max(0d, Math.min(1d, (value - min) / max));
     }
-    return total;
   }
 
   /**
    * Given the starting state of the regions and a potential ending state
    * compute cost based upon the number of regions that have moved.
-   *
-   * @param cluster The state of the cluster
-   * @return The cost. Between 0 and 1.
    */
-  double computeMoveCost(Cluster cluster) {
-    double moveCost = cluster.numMovedRegions;
+  public static class MoveCostFunction extends CostFunction {
+    private static final String MOVE_COST_KEY = "hbase.master.balancer.stochastic.moveCost";
+    private static final String MAX_MOVES_PERCENT_KEY =
+        "hbase.master.balancer.stochastic.maxMovePercent";
+    private static final float DEFAULT_MOVE_COST = 100;
+    private static final int DEFAULT_MAX_MOVES = 600;
+    private static final float DEFAULT_MAX_MOVE_PERCENT = 0.25f;
+    private static final int META_MOVE_COST_MULT = 10;
 
-    //Don't let this single balance move more than the max moves.
-    //This allows better scaling to accurately represent the actual cost of a move.
-    if (moveCost > maxMoves) {
-      return Double.MAX_VALUE;   //return a number much greater than any of the other cost functions
-    }
+    private final float maxMovesPercent;
+
+    MoveCostFunction(Configuration conf) {
+      super(conf);
 
-    //META region is special
-    if (cluster.numMovedMetaRegions > 0) {
-      maxMoves += 9 * cluster.numMovedMetaRegions; //assume each META region move costs 10 times
+      // Move cost multiplier should be the same cost or higher than the rest of the costs to ensure
+      // that large benefits are need to overcome the cost of a move.
+      this.setMultiplier(conf.getFloat(MOVE_COST_KEY, DEFAULT_MOVE_COST));
+      // What percent of the number of regions a single run of the balancer can move.
+      maxMovesPercent = conf.getFloat(MAX_MOVES_PERCENT_KEY, DEFAULT_MAX_MOVE_PERCENT);
     }
 
-    return scale(0, cluster.numRegions, moveCost);
+    @Override
+    double cost(Cluster cluster) {
+      // Try and size the max number of Moves, but always be prepared to move some.
+      int maxMoves = Math.max((int) (cluster.numRegions * maxMovesPercent),
+          DEFAULT_MAX_MOVES);
+
+      double moveCost = cluster.numMovedRegions;
+
+      // Don't let this single balance move more than the max moves.
+      // This allows better scaling to accurately represent the actual cost of a move.
+      if (moveCost > maxMoves) {
+        return 1000000;   // return a number much greater than any of the other cost
+      }
+
+      // META region is special
+      if (cluster.numMovedMetaRegions > 0) {
+        // assume each META region move costs 10 times
+        moveCost += META_MOVE_COST_MULT * cluster.numMovedMetaRegions;
+      }
+
+      return scale(0, cluster.numRegions + META_MOVE_COST_MULT, moveCost);
+    }
   }
 
   /**
    * Compute the cost of a potential cluster state from skew in number of
-   * regions on a cluster
-   *
-   * @param cluster The state of the cluster
-   * @return The cost of region load imbalance.
+   * regions on a cluster.
    */
-  double computeSkewLoadCost(Cluster cluster) {
-    DescriptiveStatistics stats = new DescriptiveStatistics();
-    for (int[] regions : cluster.regionsPerServer) {
-      stats.addValue(regions.length);
+  public static class RegionCountSkewCostFunction extends CostFunction {
+    private static final String REGION_COUNT_SKEW_COST_KEY =
+        "hbase.master.balancer.stochastic.regionCountCost";
+    private static final float DEFAULT_REGION_COUNT_SKEW_COST = 500;
+
+    private double[] stats = null;
+
+    RegionCountSkewCostFunction(Configuration conf) {
+      super(conf);
+      // Load multiplier should be the greatest as it is the most general way to balance data.
+      this.setMultiplier(conf.getFloat(REGION_COUNT_SKEW_COST_KEY, DEFAULT_REGION_COUNT_SKEW_COST));
+    }
+
+    @Override
+    double cost(Cluster cluster) {
+      if (stats == null || stats.length != cluster.numServers) {
+        stats = new double[cluster.numServers];
+      }
+
+      for (int i =0; i < cluster.numServers; i++) {
+        stats[i] = cluster.regionsPerServer[i].length;
+      }
+      return costFromArray(stats);
     }
-    return costFromStats(stats);
   }
 
   /**
    * Compute the cost of a potential cluster configuration based upon how evenly
    * distributed tables are.
-   *
-   * @param cluster The state of the cluster
-   * @return Cost of imbalance in table.
    */
-  double computeTableSkewLoadCost(Cluster cluster) {
-    double max = cluster.numRegions;
-    double min = cluster.numRegions / cluster.numServers;
-    double value = 0;
+  public static class TableSkewCostFunction extends CostFunction {
+
+    private static final String TABLE_SKEW_COST_KEY =
+        "hbase.master.balancer.stochastic.tableSkewCost";
+    private static final float DEFAULT_TABLE_SKEW_COST = 35;
 
-    for (int i = 0 ; i < cluster.numMaxRegionsPerTable.length; i++) {
-      value += cluster.numMaxRegionsPerTable[i];
+    TableSkewCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
     }
 
-    return scale(min, max, value);
+    @Override
+    double cost(Cluster cluster) {
+      double max = cluster.numRegions;
+      double min = cluster.numRegions / cluster.numServers;
+      double value = 0;
+
+      for (int i = 0; i < cluster.numMaxRegionsPerTable.length; i++) {
+        value += cluster.numMaxRegionsPerTable[i];
+      }
+
+      return scale(min, max, value);
+    }
   }
 
+
   /**
    * Compute a cost of a potential cluster configuration based upon where
    * {@link org.apache.hadoop.hbase.regionserver.StoreFile}s are located.
-   *
-   * @param cluster The state of the cluster
-   * @return A cost between 0 and 1. 0 Means all regions are on the sever with
-   *         the most local store files.
    */
-  double computeDataLocalityCost(Cluster cluster) {
+  public static class LocalityCostFunction extends CostFunction {
 
-    double max = 0;
-    double cost = 0;
+    private static final String LOCALITY_COST_KEY = "hbase.master.balancer.stochastic.localityCost";
+    private static final float DEFAULT_LOCALITY_COST = 25;
 
-    // If there's no master so there's no way anything else works.
-    if (this.services == null) return cost;
+    private MasterServices services;
 
-    for (int i = 0; i < cluster.regionLocations.length; i++) {
-      max += 1;
-      int serverIndex = cluster.regionIndexToServerIndex[i];
-      int[] regionLocations = cluster.regionLocations[i];
+    LocalityCostFunction(Configuration conf, MasterServices srv) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(LOCALITY_COST_KEY, DEFAULT_LOCALITY_COST));
+      this.services = srv;
+    }
 
-      // If we can't find where the data is getTopBlock returns null.
-      // so count that as being the best possible.
-      if (regionLocations == null) {
-        continue;
+    void setServices(MasterServices srvc) {
+      this.services = srvc;
+    }
+
+    @Override
+    double cost(Cluster cluster) {
+      double max = 0;
+      double cost = 0;
+
+      // If there's no master so there's no way anything else works.
+      if (this.services == null) {
+        return cost;
       }
 
-      int index = -1;
-      for (int j = 0; j < regionLocations.length; j++) {
-        if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
-          index = j;
-          break;
+      for (int i = 0; i < cluster.regionLocations.length; i++) {
+        max += 1;
+        int serverIndex = cluster.regionIndexToServerIndex[i];
+        int[] regionLocations = cluster.regionLocations[i];
+
+        // If we can't find where the data is getTopBlock returns null.
+        // so count that as being the best possible.
+        if (regionLocations == null) {
+          continue;
         }
-      }
 
-      if (index < 0) {
-        cost += 1;
-      } else {
-        cost += (double) index / (double) regionLocations.length;
+        int index = -1;
+        for (int j = 0; j < regionLocations.length; j++) {
+          if (regionLocations[j] >= 0 && regionLocations[j] == serverIndex) {
+            index = j;
+            break;
+          }
+        }
+
+        if (index < 0) {
+          cost += 1;
+        } else {
+          cost += (double) index / (double) regionLocations.length;
+        }
       }
+      return scale(0, max, cost);
     }
-    return scale(0, max, cost);
-  }
-
-  /** The cost's that can be derived from RegionLoad */
-  private enum RegionLoadCostType {
-    READ_REQUEST, WRITE_REQUEST, MEMSTORE_SIZE, STOREFILE_SIZE
   }
 
   /**
-   * Compute the cost of the current cluster state due to some RegionLoadCost type
-   *
-   * @param cluster The state of the cluster
-   * @param costType     what type of cost to consider
-   * @return the scaled cost.
+   * Base class the allows writing costs functions from rolling average of some
+   * number from RegionLoad.
    */
-  private double computeRegionLoadCost(Cluster cluster, RegionLoadCostType costType) {
+  public abstract static class CostFromRegionLoadFunction extends CostFunction {
+
+    private ClusterStatus clusterStatus = null;
+    private Map<String, List<RegionLoad>> loads = null;
+    private double[] stats = null;
+    CostFromRegionLoadFunction(Configuration conf) {
+      super(conf);
+    }
+
+    void setClusterStatus(ClusterStatus status) {
+      this.clusterStatus = status;
+    }
+
+    void setLoads(Map<String, List<RegionLoad>> l) {
+      this.loads = l;
+    }
+
+
+    double cost(Cluster cluster) {
+      if (clusterStatus == null || loads == null) {
+        return 0;
+      }
+
+      if (stats == null || stats.length != cluster.numServers) {
+        stats = new double[cluster.numServers];
+      }
+
+      for (int i =0; i < stats.length; i++) {
+        //Cost this server has from RegionLoad
+        long cost = 0;
+
+        // for every region on this server get the rl
+        for(int regionIndex:cluster.regionsPerServer[i]) {
+          List<RegionLoad> regionLoadList =  cluster.regionLoads[regionIndex];
+
+          // Now if we found a region load get the type of cost that was requested.
+          if (regionLoadList != null) {
+            cost += getRegionLoadCost(regionLoadList);
+          }
+        }
+
+        // Add the total cost to the stats.
+        stats[i] = cost;
+      }
+
+      // Now return the scaled cost from data held in the stats object.
+      return costFromArray(stats);
+    }
 
-    if (this.clusterStatus == null || this.loads == null || this.loads.size() == 0) return 0;
+    protected double getRegionLoadCost(List<RegionLoad> regionLoadList) {
+      double cost = 0;
 
-    DescriptiveStatistics stats = new DescriptiveStatistics();
+      for (RegionLoad rl : regionLoadList) {
+        double toAdd = getCostFromRl(rl);
 
-    for (List<RegionLoad> rl : cluster.regionLoads) {
-      long cost = 0; //Cost this server has from RegionLoad
-        // Now if we found a region load get the type of cost that was requested.
-      if (rl != null) {
-        cost += getRegionLoadCost(rl, costType);
+        if (cost == 0) {
+          cost = toAdd;
+        } else {
+          cost = (.5 * cost) + (.5 * toAdd);
+        }
       }
 
-      // Add the total cost to the stats.
-      stats.addValue(cost);
+      return cost;
     }
 
-    // No return the scaled cost from data held in the stats object.
-    return costFromStats(stats);
+    protected abstract double getCostFromRl(RegionLoad rl);
   }
 
   /**
-   * Get the un-scaled cost from a RegionLoad
-   *
-   * @param regionLoadList   the Region load List
-   * @param type The type of cost to extract
-   * @return the double representing the cost
-   */
-  private double getRegionLoadCost(List<RegionLoad> regionLoadList, RegionLoadCostType type) {
-    double cost = 0;
-
-    int size = regionLoadList.size();
-    for(int i =0; i< size; i++) {
-      RegionLoad rl = regionLoadList.get(i);
-      double toAdd = 0;
-      switch (type) {
-        case READ_REQUEST:
-          toAdd =  rl.getReadRequestsCount();
-          break;
-        case WRITE_REQUEST:
-          toAdd =  rl.getWriteRequestsCount();
-          break;
-        case MEMSTORE_SIZE:
-          toAdd =  rl.getMemStoreSizeMB();
-          break;
-        case STOREFILE_SIZE:
-          toAdd =  rl.getStorefileSizeMB();
-          break;
-        default:
-          assert false : "RegionLoad cost type not supported.";
-          return 0;
-      }
+   * Compute the cost of total number of read requests  The more unbalanced the higher the
+   * computed cost will be.  This uses a rolling average of regionload.
+   */
 
-      if (cost == 0) {
-        cost = toAdd;
-      } else {
-        cost = (.5 * cost) + (.5 * toAdd);
-      }
+  public static class ReadRequestCostFunction extends CostFromRegionLoadFunction {
+
+    private static final String READ_REQUEST_COST_KEY =
+        "hbase.master.balancer.stochastic.readRequestCost";
+    private static final float DEFAULT_READ_REQUEST_COST = 5;
+
+    ReadRequestCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(READ_REQUEST_COST_KEY, DEFAULT_READ_REQUEST_COST));
     }
 
-    return cost;
 
+    protected double getCostFromRl(RegionLoad rl) {
+      return rl.getReadRequestsCount();
+    }
   }
 
   /**
-   * Function to compute a scaled cost using {@link DescriptiveStatistics}. It
-   * assumes that this is a zero sum set of costs.  It assumes that the worst case
-   * possible is all of the elements in one region server and the rest having 0.
-   *
-   * @param stats the costs
-   * @return a scaled set of costs.
+   * Compute the cost of total number of write requests.  The more unbalanced the higher the
+   * computed cost will be.  This uses a rolling average of regionload.
    */
-  double costFromStats(DescriptiveStatistics stats) {
-    double totalCost = 0;
-    double mean = stats.getMean();
+  public static class WriteRequestCostFunction extends CostFromRegionLoadFunction {
 
-    //Compute max as if all region servers had 0 and one had the sum of all costs.  This must be
-    // a zero sum cost for this to make sense.
-    //TODO: Should we make this sum of square errors?
-    double max = ((stats.getN() - 1) * mean) + (stats.getSum() - mean);
-    for (double n : stats.getValues()) {
-      double diff = Math.abs(mean - n);
-      totalCost += diff;
+    private static final String WRITE_REQUEST_COST_KEY =
+        "hbase.master.balancer.stochastic.writeRequestCost";
+    private static final float DEFAULT_WRITE_REQUEST_COST = 5;
+
+    WriteRequestCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(WRITE_REQUEST_COST_KEY, DEFAULT_WRITE_REQUEST_COST));
     }
 
-    return scale(0, max, totalCost);
+    protected double getCostFromRl(RegionLoad rl) {
+      return rl.getWriteRequestsCount();
+    }
   }
 
   /**
-   * Scale the value between 0 and 1.
-   *
-   * @param min   Min value
-   * @param max   The Max value
-   * @param value The value to be scaled.
-   * @return The scaled value.
+   * Compute the cost of total memstore size.  The more unbalanced the higher the
+   * computed cost will be.  This uses a rolling average of regionload.
+   */
+  public static class MemstoreSizeCostFunction extends CostFromRegionLoadFunction {
+
+    private static final String MEMSTORE_SIZE_COST_KEY =
+        "hbase.master.balancer.stochastic.memstoreSizeCost";
+    private static final float DEFAULT_MEMSTORE_SIZE_COST = 5;
+
+    MemstoreSizeCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(MEMSTORE_SIZE_COST_KEY, DEFAULT_MEMSTORE_SIZE_COST));
+    }
+
+    @Override
+    protected double getCostFromRl(RegionLoad rl) {
+      return rl.getMemStoreSizeMB();
+    }
+  }
+  /**
+   * Compute the cost of total open storefiles size.  The more unbalanced the higher the
+   * computed cost will be.  This uses a rolling average of regionload.
    */
-  private double scale(double min, double max, double value) {
-    if (max == 0 || value == 0) {
-      return 0;
+  public static class StoreFileCostFunction extends CostFromRegionLoadFunction {
+
+    private static final String STOREFILE_SIZE_COST_KEY =
+        "hbase.master.balancer.stochastic.storefileSizeCost";
+    private static final float DEFAULT_STOREFILE_SIZE_COST = 5;
+
+    StoreFileCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(STOREFILE_SIZE_COST_KEY, DEFAULT_STOREFILE_SIZE_COST));
     }
 
-    return Math.max(0d, Math.min(1d, (value - min) / max));
+    @Override
+    protected double getCostFromRl(RegionLoad rl) {
+      return rl.getStorefileSizeMB();
+    }
   }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java?rev=1486258&r1=1486257&r2=1486258&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/BalancerTestBase.java Sat May 25 00:22:59 2013
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Byte
  */
 public class BalancerTestBase {
 
-  private static Random rand = new Random();
+  protected static Random rand = new Random();
   static int regionId = 0;
 
   /**
@@ -125,7 +125,9 @@ public class BalancerTestBase {
    * @param plans
    * @return
    */
-  protected List<ServerAndLoad> reconcile(List<ServerAndLoad> list, List<RegionPlan> plans) {
+  protected List<ServerAndLoad> reconcile(List<ServerAndLoad> list,
+                                          List<RegionPlan> plans,
+                                          Map<ServerName, List<HRegionInfo>> servers) {
     List<ServerAndLoad> result = new ArrayList<ServerAndLoad>(list.size());
     if (plans == null) return result;
     Map<ServerName, ServerAndLoad> map = new HashMap<ServerName, ServerAndLoad>(list.size());
@@ -134,9 +136,13 @@ public class BalancerTestBase {
     }
     for (RegionPlan plan : plans) {
       ServerName source = plan.getSource();
+
       updateLoad(map, source, -1);
       ServerName destination = plan.getDestination();
       updateLoad(map, destination, +1);
+
+      servers.get(source).remove(plan.getRegionInfo());
+      servers.get(destination).add(plan.getRegionInfo());
     }
     result.clear();
     result.addAll(map.values());

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java?rev=1486258&r1=1486257&r2=1486258&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestDefaultLoadBalancer.java Sat May 25 00:22:59 2013
@@ -116,7 +116,7 @@ public class TestDefaultLoadBalancer ext
       List<ServerAndLoad> list = convertToList(servers);
       LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
       List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
-      List<ServerAndLoad> balancedCluster = reconcile(list, plans);
+      List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
       LOG.info("Mock Balance : " + printMock(balancedCluster));
       assertClusterAsBalanced(balancedCluster);
       for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java?rev=1486258&r1=1486257&r2=1486258&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java Sat May 25 00:22:59 2013
@@ -19,10 +19,14 @@ package org.apache.hadoop.hbase.master.b
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,7 +38,6 @@ import org.apache.hadoop.hbase.MediumTes
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -46,6 +49,7 @@ public class TestStochasticLoadBalancer 
   @BeforeClass
   public static void beforeAllTests() throws Exception {
     Configuration conf = HBaseConfiguration.create();
+    conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 0.75f);
     loadBalancer = new StochasticLoadBalancer();
     loadBalancer.setConf(conf);
   }
@@ -101,6 +105,10 @@ public class TestStochasticLoadBalancer 
       new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 10},
       new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 123},
       new int[]{1, 1, 1, 1, 1, 1, 1, 1, 1, 155},
+      new int[]{10, 7, 12, 8, 11, 10, 9, 14},
+      new int[]{13, 14, 6, 10, 10, 10, 8, 10},
+      new int[]{130, 14, 60, 10, 100, 10, 80, 10},
+      new int[]{130, 140, 60, 100, 100, 100, 80, 100}
   };
 
   /**
@@ -119,9 +127,11 @@ public class TestStochasticLoadBalancer 
       List<ServerAndLoad> list = convertToList(servers);
       LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
       List<RegionPlan> plans = loadBalancer.balanceCluster(servers);
-      List<ServerAndLoad> balancedCluster = reconcile(list, plans);
+      List<ServerAndLoad> balancedCluster = reconcile(list, plans, servers);
       LOG.info("Mock Balance : " + printMock(balancedCluster));
       assertClusterAsBalanced(balancedCluster);
+      List<RegionPlan> secondPlans =  loadBalancer.balanceCluster(servers);
+      assertNull(secondPlans);
       for (Map.Entry<ServerName, List<HRegionInfo>> entry : servers.entrySet()) {
         returnRegions(entry.getValue());
         returnServer(entry.getKey());
@@ -132,56 +142,96 @@ public class TestStochasticLoadBalancer 
 
   @Test
   public void testSkewCost() {
+    Configuration conf = HBaseConfiguration.create();
+    StochasticLoadBalancer.CostFunction
+        costFunction = new StochasticLoadBalancer.RegionCountSkewCostFunction(conf);
     for (int[] mockCluster : clusterStateMocks) {
-      double cost = loadBalancer.computeSkewLoadCost(mockCluster(mockCluster));
+      double cost = costFunction.cost(mockCluster(mockCluster));
       assertTrue(cost >= 0);
       assertTrue(cost <= 1.01);
     }
     assertEquals(1,
-      loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 0, 1 })), 0.01);
+        costFunction.cost(mockCluster(new int[]{0, 0, 0, 0, 1})), 0.01);
     assertEquals(.75,
-      loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 0, 1, 1 })), 0.01);
+        costFunction.cost(mockCluster(new int[]{0, 0, 0, 1, 1})), 0.01);
     assertEquals(.5,
-      loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 0, 1, 1, 1 })), 0.01);
+        costFunction.cost(mockCluster(new int[]{0, 0, 1, 1, 1})), 0.01);
     assertEquals(.25,
-      loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 0, 1, 1, 1, 1 })), 0.01);
+        costFunction.cost(mockCluster(new int[]{0, 1, 1, 1, 1})), 0.01);
     assertEquals(0,
-      loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 1, 1, 1, 1, 1 })), 0.01);
+        costFunction.cost(mockCluster(new int[]{1, 1, 1, 1, 1})), 0.01);
     assertEquals(0,
-        loadBalancer.computeSkewLoadCost(mockCluster(new int[] { 10, 10, 10, 10, 10 })), 0.01);
+        costFunction.cost(mockCluster(new int[]{10, 10, 10, 10, 10})), 0.01);
   }
 
   @Test
   public void testTableSkewCost() {
+    Configuration conf = HBaseConfiguration.create();
+    StochasticLoadBalancer.CostFunction
+        costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
     for (int[] mockCluster : clusterStateMocks) {
       BaseLoadBalancer.Cluster cluster = mockCluster(mockCluster);
-      double cost = loadBalancer.computeTableSkewLoadCost(cluster);
+      double cost = costFunction.cost(cluster);
       assertTrue(cost >= 0);
       assertTrue(cost <= 1.01);
     }
   }
 
   @Test
-  public void testCostFromStats() {
-    DescriptiveStatistics statOne = new DescriptiveStatistics();
+  public void testCostFromArray() {
+    Configuration conf = HBaseConfiguration.create();
+    StochasticLoadBalancer.CostFromRegionLoadFunction
+        costFunction = new StochasticLoadBalancer.MemstoreSizeCostFunction(conf);
+
+    double[] statOne = new double[100];
     for (int i =0; i < 100; i++) {
-      statOne.addValue(10);
+      statOne[i] = 10;
     }
-    assertEquals(0, loadBalancer.costFromStats(statOne), 0.01);
+    assertEquals(0, costFunction.costFromArray(statOne), 0.01);
 
-    DescriptiveStatistics statTwo = new DescriptiveStatistics();
+    double[] statTwo= new double[101];
     for (int i =0; i < 100; i++) {
-      statTwo.addValue(0);
+      statTwo[i] = 0;
     }
-    statTwo.addValue(100);
-    assertEquals(1, loadBalancer.costFromStats(statTwo), 0.01);
+    statTwo[100] = 100;
+    assertEquals(1, costFunction.costFromArray(statTwo), 0.01);
 
-    DescriptiveStatistics statThree = new DescriptiveStatistics();
+    double[] statThree = new double[200];
     for (int i =0; i < 100; i++) {
-      statThree.addValue(0);
-      statThree.addValue(100);
+      statThree[i] = (0);
+      statThree[i+100] = 100;
     }
-    assertEquals(0.5, loadBalancer.costFromStats(statThree), 0.01);
+    assertEquals(0.5, costFunction.costFromArray(statThree), 0.01);
+  }
+
+  @Test(timeout =  30000)
+  public void testLosingRs() throws Exception {
+    int numNodes = 3;
+    int numRegions = 20;
+    int numRegionsPerServer = 3; //all servers except one
+    int numTables = 2;
+
+    Map<ServerName, List<HRegionInfo>> serverMap =
+        createServerMap(numNodes, numRegions, numRegionsPerServer, numTables);
+    List<ServerAndLoad> list = convertToList(serverMap);
+
+
+    List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
+    assertNotNull(plans);
+
+    // Apply the plan to the mock cluster.
+    List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
+
+    assertClusterAsBalanced(balancedCluster);
+
+    ServerName sn = serverMap.keySet().toArray(new ServerName[serverMap.size()])[0];
+
+    ServerName deadSn = new ServerName(sn.getHostname(), sn.getPort(), sn.getStartcode() -100);
+
+    serverMap.put(deadSn, new ArrayList<HRegionInfo>(0));
+
+    plans = loadBalancer.balanceCluster(serverMap);
+    assertNull(plans);
   }
 
   @Test (timeout = 20000)
@@ -190,7 +240,7 @@ public class TestStochasticLoadBalancer 
     int numRegions = 1000;
     int numRegionsPerServer = 40; //all servers except one
     int numTables = 10;
-    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
   }
 
   @Test (timeout = 20000)
@@ -199,45 +249,92 @@ public class TestStochasticLoadBalancer 
     int numRegions = 2000;
     int numRegionsPerServer = 40; //all servers except one
     int numTables = 10;
-    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
   }
 
-  @Test (timeout = 40000)
+  @Test (timeout = 20000)
+  public void testSmallCluster3() {
+    int numNodes = 20;
+    int numRegions = 2000;
+    int numRegionsPerServer = 1; // all servers except one
+    int numTables = 10;
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, false /* max moves */);
+  }
+
+  @Test (timeout = 800000)
   public void testMidCluster() {
     int numNodes = 100;
     int numRegions = 10000;
-    int numRegionsPerServer = 60; //all servers except one
+    int numRegionsPerServer = 60; // all servers except one
     int numTables = 40;
-    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
   }
 
-  @Test (timeout = 1200000)
+  @Test (timeout = 800000)
   public void testMidCluster2() {
     int numNodes = 200;
     int numRegions = 100000;
-    int numRegionsPerServer = 40; //all servers except one
+    int numRegionsPerServer = 40; // all servers except one
     int numTables = 400;
-    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+    testWithCluster(numNodes,
+        numRegions,
+        numRegionsPerServer,
+        numTables,
+        false /* num large num regions means may not always get to best balance with one run */);
+  }
+
+
+  @Test (timeout = 800000)
+  public void testMidCluster3() {
+    int numNodes = 100;
+    int numRegions = 2000;
+    int numRegionsPerServer = 9; // all servers except one
+    int numTables = 110;
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
+    // TODO(eclark): Make sure that the tables are well distributed.
   }
 
   @Test
-  @Ignore
-  //TODO: This still does not finish, making the LoadBalancer unusable at this scale. We should solve this.
-  //There are two reasons so far;
-  // - It takes too long for iterating for all servers
-  // - Moving one region out of the loaded server only costs a slight decrease in the cost of regionCountSkewCost
-  // but also a slight increase on the moveCost. loadMultiplier / moveCostMultiplier is not high enough to bring down
-  // the total cost, so that the eager selection cannot continue. This can be solved by smt like
-  // http://en.wikipedia.org/wiki/Simulated_annealing instead of random walk with eager selection
   public void testLargeCluster() {
     int numNodes = 1000;
     int numRegions = 100000; //100 regions per RS
     int numRegionsPerServer = 80; //all servers except one
     int numTables = 100;
-    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables);
+    testWithCluster(numNodes, numRegions, numRegionsPerServer, numTables, true);
   }
 
-  protected void testWithCluster(int numNodes, int numRegions, int numRegionsPerServer, int numTables) {
+  protected void testWithCluster(int numNodes,
+                                 int numRegions,
+                                 int numRegionsPerServer,
+                                 int numTables,
+                                 boolean assertFullyBalanced) {
+    Map<ServerName, List<HRegionInfo>> serverMap =
+        createServerMap(numNodes, numRegions, numRegionsPerServer, numTables);
+
+    List<ServerAndLoad> list = convertToList(serverMap);
+    LOG.info("Mock Cluster : " + printMock(list) + " " + printStats(list));
+
+    // Run the balancer.
+    List<RegionPlan> plans = loadBalancer.balanceCluster(serverMap);
+    assertNotNull(plans);
+
+    // Check to see that this actually got to a stable place.
+    if (assertFullyBalanced) {
+      // Apply the plan to the mock cluster.
+      List<ServerAndLoad> balancedCluster = reconcile(list, plans, serverMap);
+
+      // Print out the cluster loads to make debugging easier.
+      LOG.info("Mock Balance : " + printMock(balancedCluster));
+      assertClusterAsBalanced(balancedCluster);
+      List<RegionPlan> secondPlans =  loadBalancer.balanceCluster(serverMap);
+      assertNull(secondPlans);
+    }
+  }
+
+  private Map<ServerName, List<HRegionInfo>> createServerMap(int numNodes,
+                                                             int numRegions,
+                                                             int numRegionsPerServer,
+                                                             int numTables) {
     //construct a cluster of numNodes, having  a total of numRegions. Each RS will hold
     //numRegionsPerServer many regions except for the last one, which will host all the
     //remaining regions
@@ -246,8 +343,6 @@ public class TestStochasticLoadBalancer 
       cluster[i] = numRegionsPerServer;
     }
     cluster[cluster.length - 1] = numRegions - ((cluster.length - 1) * numRegionsPerServer);
-
-    assertNotNull(loadBalancer.balanceCluster(mockClusterServers(cluster, numTables)));
+    return mockClusterServers(cluster, numTables);
   }
-
 }



Mime
View raw message