hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r989407 - in /hbase/branches/0.90_master_rewrite: ./ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbase/master/handler/ src/main/java/org/apache/hadoop/hbase/ut...
Date Wed, 25 Aug 2010 23:59:27 GMT
Author: stack
Date: Wed Aug 25 23:59:26 2010
New Revision: 989407

URL: http://svn.apache.org/viewvc?rev=989407&view=rev
Log:
Added in new load balancer.

Fixed a good few tests.

M BRANCH_TODO.txt
  Added a question.
M src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java
  Test was failing because couldn't get to zk.
M src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
  Redo loop; possible to be infinite as written.
M src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
  Constructor on LB changed.
M src/test/java/org/apache/hadoop/hbase/util/TestMergeMeta.java
  How we judge a table offlined is different now, its not the offline flag in HTD.
M src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java
  This test expects tabled disabled but how we disable has changed.  Update test.
M src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
  Update out inmemory map w/ split info.  Was ignoring it previously.
M src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
  Javadoc.  Moved duplicated code out to method so could be reused (addToServers).
  On server shutdown, need to clear any reference to dead server from the
  regionPlans map else we could end up failing an assignment as we try to
  pass region to dead server. Added handleSplitReport.
  Added an expensive clone of the internal servers to regions Map for
  use by the balancer. Added new method balance, something for the
  balancer to use requesting the AssignmentManager to act on plans it created.
M src/main/java/org/apache/hadoop/hbase/master/HMaster.java
  Start up the balanacer.
M src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
  Make LB a Chore.  Make RegionPlan include HRegionInfo.
M src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
  Added in assigning out of root and meta ahead of other assignments
  -- if this server was carrying them.
M src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
  Have processServerShutdown return info on whether it was carrying catalog
  regions or not.
M src/main/java/org/apache/hadoop/hbase/util/HMerge.java
  Added an override on merge for whether master needs to be up or down.

Modified:
    hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
    hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeMeta.java
    hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java

Modified: hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt (original)
+++ hbase/branches/0.90_master_rewrite/BRANCH_TODO.txt Wed Aug 25 23:59:26 2010
@@ -287,3 +287,6 @@ Later:
  Questions:
 
  If region in RIT, do I need to wait on log replay if region was in OPENING or PENDING_OPEN
state?
+
+ So on assign, if we fail -- say connection refused when we try open on the RS, regions state
remains offline -- who comes along and finds all offline and assigns?
+

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/catalog/CatalogTracker.java
Wed Aug 25 23:59:26 2010
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.NotServin
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.zookeeper.MetaNodeTracker;
 import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -365,13 +366,17 @@ public class CatalogTracker {
    * @param hsi Server that has crashed/shutdown.
    * @throws InterruptedException
    * @throws KeeperException
+   * @return Pair of booleans; if this server was carrying root, then first
+   * boolean is set, if server was carrying meta, then second boolean set.
    */
-  public void processServerShutdown(final HServerInfo hsi)
+  public Pair<Boolean, Boolean> processServerShutdown(final HServerInfo hsi)
   throws InterruptedException, KeeperException {
+    Pair<Boolean, Boolean> result = new Pair<Boolean, Boolean>(false, false);
     HServerAddress rootHsa = getRootLocation();
     if (rootHsa == null) {
       LOG.info("-ROOT- is not assigned; continuing");
     } else if (hsi.getServerAddress().equals(rootHsa)) {
+      result.setFirst(true);
       LOG.info(hsi.getServerName() + " carrying -ROOT-; deleting " +
         "-ROOT- location from meta");
       RootLocationEditor.deleteRootLocation(this.zookeeper);
@@ -380,7 +385,9 @@ public class CatalogTracker {
     if (metaHsa == null) {
       LOG.info(".META. is not assigned; continuing");
     } else if (hsi.getServerAddress().equals(metaHsa)) {
+      result.setSecond(true);
       resetMetaLocation();
     }
+    return result;
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
Wed Aug 25 23:59:26 2010
@@ -24,14 +24,15 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.logging.Log;
@@ -59,8 +60,8 @@ import org.apache.hadoop.hbase.util.Thre
 import org.apache.hadoop.hbase.zookeeper.ZKAssign;
 import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.NodeAndData;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.io.Writable;
 import org.apache.zookeeper.KeeperException;
 
@@ -88,8 +89,9 @@ public class AssignmentManager extends Z
 
   /** Plans for region movement. */
   // TODO: When do plans get cleaned out?  Ever?
-  private final Map<String,RegionPlan> regionPlans =
-    new TreeMap<String,RegionPlan>();
+  // Its cleaned on server shutdown processing -- St.Ack
+  private final Map<String, RegionPlan> regionPlans =
+    new TreeMap<String, RegionPlan>();
 
   /** Set of tables that have been disabled. */
   private final Set<String> disabledTables =
@@ -98,14 +100,19 @@ public class AssignmentManager extends Z
   /**
    * Server to regions assignment map.
    * Contains the set of regions currently assigned to a given server.
+   * This Map and {@link #regions} are tied.  Always update this in tandem
+   * with the other under a lock on {@link #regions}
+   * @see #regions
    */
-  private final SortedMap<HServerInfo,Set<HRegionInfo>> servers =
-        new TreeMap<HServerInfo,Set<HRegionInfo>>();
+  private final NavigableMap<HServerInfo, List<HRegionInfo>> servers =
+    new TreeMap<HServerInfo, List<HRegionInfo>>();
 
   /**
    * Region to server assignment map.
    * Contains the server a given region is currently assigned to.
-   * This object should be used for all synchronization around servers/regions.
+   * This Map and {@link #servers} are tied.  Always update this in tandem
+   * with the other under a lock on {@link #regions}
+   * @see #servers
    */
   private final SortedMap<HRegionInfo,HServerInfo> regions =
     new TreeMap<HRegionInfo,HServerInfo>();
@@ -419,12 +426,7 @@ public class AssignmentManager extends Z
     }
     synchronized(regions) {
       regions.put(regionInfo, serverInfo);
-      Set<HRegionInfo> regionSet = servers.get(serverInfo);
-      if(regionSet == null) {
-        regionSet = new TreeSet<HRegionInfo>();
-        servers.put(serverInfo, regionSet);
-      }
-      regionSet.add(regionInfo);
+      addToServers(serverInfo, regionInfo);
     }
   }
 
@@ -443,7 +445,7 @@ public class AssignmentManager extends Z
     }
     synchronized(regions) {
       HServerInfo serverInfo = regions.remove(regionInfo);
-      Set<HRegionInfo> serverRegions = servers.get(serverInfo);
+      List<HRegionInfo> serverRegions = servers.get(serverInfo);
       serverRegions.remove(regionInfo);
     }
   }
@@ -458,7 +460,7 @@ public class AssignmentManager extends Z
   public void setOffline(HRegionInfo regionInfo) {
     synchronized(regions) {
       HServerInfo serverInfo = regions.remove(regionInfo);
-      Set<HRegionInfo> serverRegions = servers.get(serverInfo);
+      List<HRegionInfo> serverRegions = servers.get(serverInfo);
       serverRegions.remove(regionInfo);
     }
   }
@@ -535,29 +537,31 @@ public class AssignmentManager extends Z
     RegionPlan plan;
     synchronized(regionPlans) {
       plan = regionPlans.get(encodedName);
-      if(plan == null) {
+      if (plan == null) {
         LOG.debug("No previous transition plan for " +
             state.getRegion().getRegionNameAsString() +
             " so generating a random one from " + serverManager.numServers() +
             " ( " + serverManager.getOnlineServers().size() + ") available servers");
-        plan = new RegionPlan(encodedName, null,
-            LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
+        plan = new RegionPlan(state.getRegion(), null,
+          LoadBalancer.randomAssignment(serverManager.getOnlineServersList()));
         regionPlans.put(encodedName, plan);
       }
     }
     try {
       // Send OPEN RPC. This can fail if the server on other end is is not up.
       serverManager.sendRegionOpen(plan.getDestination(), state.getRegion());
+      // Transition RegionState to PENDING_OPEN
+      state.update(RegionState.State.PENDING_OPEN);
     } catch (Throwable t) {
-      LOG.warn("Failed assignment of " + state.getRegion());
+      LOG.warn("Failed assignment of " +
+        state.getRegion().getRegionNameAsString() + " to " +
+        plan.getDestination(), t);
       // Clean out plan we failed execute and one that doesn't look like it'll
       // succeed anyways; we need a new plan!
       synchronized(regionPlans) {
         this.regionPlans.remove(encodedName);
       }
     }
-    // Transition RegionState to PENDING_OPEN
-    state.update(RegionState.State.PENDING_OPEN);
   }
 
   /**
@@ -686,7 +690,7 @@ public class AssignmentManager extends Z
       for(HRegionInfo region : regions) {
         LOG.debug("Assigning " + region.getRegionNameAsString() + " to " + server);
         String regionName = region.getEncodedName();
-        RegionPlan plan = new RegionPlan(regionName, null,server);
+        RegionPlan plan = new RegionPlan(region, null,server);
         regionPlans.put(regionName, plan);
         assign(region);
       }
@@ -715,15 +719,24 @@ public class AssignmentManager extends Z
       }
       HServerInfo serverInfo = serverManager.getHServerInfo(regionLocation);
       regions.put(regionInfo, serverInfo);
-      Set<HRegionInfo> regionSet = servers.get(serverInfo);
-      if(regionSet == null) {
-        regionSet = new TreeSet<HRegionInfo>();
-        servers.put(serverInfo, regionSet);
-      }
-      regionSet.add(regionInfo);
+      addToServers(serverInfo, regionInfo);
     }
   }
 
+  /*
+   * Presumes caller has taken care of necessary locking modifying servers Map.
+   * @param hsi
+   * @param hri
+   */
+  private void addToServers(final HServerInfo hsi, final HRegionInfo hri) {
+    List<HRegionInfo> hris = servers.get(hsi);
+    if (hris == null) {
+      hris = new ArrayList<HRegionInfo>();
+      servers.put(hsi, hris);
+    }
+    hris.add(hri);
+  }
+
   /**
    * Blocks until there are no regions in transition.  It is possible that there
    * are regions in transition immediately after this returns but guarantees
@@ -748,6 +761,13 @@ public class AssignmentManager extends Z
   }
 
   /**
+   * @return True if regions in transition.
+   */
+  public boolean isRegionsInTransition() {
+    return !this.regionsInTransition.isEmpty();
+  }
+
+  /**
    * Checks if the specified table has been disabled by the user.
    * @param tableName
    * @return
@@ -910,6 +930,17 @@ public class AssignmentManager extends Z
    * @param hsi Server that went down.
    */
   public void processServerShutdown(final HServerInfo hsi) {
+    // Clean out any exisiting assignment plans for this server
+    synchronized (this.regionPlans) {
+      for (Iterator <Map.Entry<String, RegionPlan>> i =
+        this.regionPlans.entrySet().iterator(); i.hasNext();) {
+        Map.Entry<String, RegionPlan> e = i.next();
+        if (e.getValue().getDestination().equals(hsi)) {
+          // Use iterator's remove else we'll get CME.fail a
+          i.remove();
+        }
+      }
+    }
     synchronized (regionsInTransition) {
       // Iterate all regions in transition checking if were on this server
       final String serverName = hsi.getServerName();
@@ -937,6 +968,100 @@ public class AssignmentManager extends Z
     }
   }
 
+  /**
+   * Update inmemory structures.
+   * @param hsi Server that reported the split
+   * @param parent Parent region that was split
+   * @param a Daughter region A
+   * @param b Daughter region B
+   */
+  public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent,
+      final HRegionInfo a, final HRegionInfo b) {
+    synchronized (this.regions) {
+      checkRegion(hsi, parent, true);
+      checkRegion(hsi, a, false);
+      this.regions.put(a, hsi);
+      this.regions.put(b, hsi);
+      removeFromServers(hsi, parent, true);
+      removeFromServers(hsi, a, false);
+      removeFromServers(hsi, b, false);
+      addToServers(hsi, a);
+      addToServers(hsi, b);
+    }
+  }
+
+  /*
+   * Caller must hold locks on regions Map.
+   * @param hsi
+   * @param hri
+   * @param expected
+   */
+  private void checkRegion(final HServerInfo hsi, final HRegionInfo hri,
+      final boolean expected) {
+    HServerInfo serverInfo = regions.remove(hri);
+    if (expected) {
+      if (serverInfo == null) {
+        LOG.info("Region not on a server: " + hri.getRegionNameAsString());
+      }
+    } else {
+      if (serverInfo != null) {
+        LOG.warn("Region present on " + hsi + "; unexpected");
+      }
+    }
+  }
+
+  /*
+   * Caller must hold locks on servers Map.
+   * @param hsi
+   * @param hri
+   * @param expected
+   */
+  private void removeFromServers(final HServerInfo hsi, final HRegionInfo hri,
+      final boolean expected) {
+    List<HRegionInfo> serverRegions = this.servers.get(hsi);
+    boolean removed = serverRegions.remove(hri);
+    if (expected) {
+      if (!removed) {
+        LOG.warn(hri.getRegionNameAsString() + " not found on " + hsi +
+          "; unexpected");
+      }
+    } else {
+      if (removed) {
+        LOG.warn(hri.getRegionNameAsString() + " found on " + hsi +
+        "; unexpected");
+      }
+    }
+  }
+
+  /**
+   * @return A clone of current assignments
+   */
+  Map<HServerInfo, List<HRegionInfo>> getAssignments() {
+    // This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
+    // Can't let out original since it can change and at least the loadbalancer
+    // wants to iterate this exported list.  We need to synchronize on regions
+    // since all access to this.servers is under a lock on this.regions.
+    Map<HServerInfo, List<HRegionInfo>> result = null;
+    synchronized (this.regions) {
+      result = new HashMap<HServerInfo, List<HRegionInfo>>(this.servers.size());
+      for (Map.Entry<HServerInfo, List<HRegionInfo>> e: this.servers.entrySet())
{
+        List<HRegionInfo> shallowCopy = new ArrayList<HRegionInfo>(e.getValue());
+        result.put(e.getKey(), shallowCopy);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * @param plan Plan to execute.
+   */
+  void balance(final RegionPlan plan) {
+    synchronized (this.regionPlans) {
+      this.regionPlans.put(plan.getRegionName(), plan);
+    }
+    unassign(plan.getRegionInfo());
+  }
+
   public static class RegionState implements Writable {
     private HRegionInfo region;
 

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
Wed Aug 25 23:59:26 2010
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hbase.util.InfoServer;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.VersionInfo;
 import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
@@ -157,6 +158,8 @@ implements HMasterInterface, HMasterRegi
   // Instance of the hbase executor service.
   ExecutorService executorService;
 
+  private LoadBalancer balancer;
+
   /**
    * Initializes the HMaster. The steps are as follows:
    *
@@ -297,6 +300,7 @@ implements HMasterInterface, HMasterRegi
       }
     }
     this.rpcServer.stop();
+    this.balancer.interrupt();
     this.activeMasterManager.stop();
     this.zooKeeper.close();
     this.executorService.shutdown();
@@ -423,6 +427,14 @@ implements HMasterInterface, HMasterRegi
         this.infoServer.setAttribute(MASTER, this);
         this.infoServer.start();
       }
+
+      // Start up the load balancer
+      String name = getServerName() + "-loadbalancer";
+      int period = getConfiguration().getInt("hbase.balancer.period", 300000);
+      this.balancer = new LoadBalancer(name, period, this,
+        this.assignmentManager);
+      Threads.setDaemonThreadRunning(this.balancer, name);
+
       // Start the server last so everything else is running before we start
       // receiving requests.
       this.rpcServer.start();

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/LoadBalancer.java
Wed Aug 25 23:59:26 2010
@@ -31,45 +31,66 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.Stoppable;
 
 /**
  * Makes decisions about the placement and movement of Regions across
  * RegionServers.
  *
- * Cluster-wide load balancing will occur only when there are no regions in
+ * <p>Cluster-wide load balancing will occur only when there are no regions in
  * transition and according to a fixed period of a time using {@link #balanceCluster(Map)}.
  *
- * Inline region placement with {@link #immediateAssignment} can be used when
+ * <p>Inline region placement with {@link #immediateAssignment} can be used when
  * the Master needs to handle closed regions that it currently does not have
  * a destination set for.  This can happen during master failover.
  *
- * On cluster startup, {@link #bulkAssignment} can be used to determine
+ * <p>On cluster startup, {@link #bulkAssignment} can be used to determine
  * locations for all Regions in a cluster.
+ * 
+ * <p>This classes produces plans for the {@link AssignmentManager} to execute.
  */
-public class LoadBalancer {
+public class LoadBalancer extends Chore {
   private static final Log LOG = LogFactory.getLog(LoadBalancer.class);
-
-  // Number of seconds between each run of the load balancer
-  private final long balancerPeriod;
-
   private static final Random rand = new Random();
+  private final AssignmentManager assignmentManager;
 
   /**
    * Instantiate the load balancer with the specified configuration.
    *
    * This sets configuration parameters to be used by the balancing algorithms
    * and launches a background thread to perform periodic load balancing.
-   * @param conf
+   * @param stoppable
+   * @param period
+   * @param name Name for this LB thread.
+   * @param assignmentManager
    */
-  public LoadBalancer(Configuration conf) {
-    balancerPeriod = conf.getLong("hbase.balancer.period", 300000);
+  public LoadBalancer(final String name, final int period,
+      final Stoppable stoppable, final AssignmentManager assignmentManager) {
+    super(name, period, stoppable);
+    this.assignmentManager = assignmentManager;
+  }
+
+  @Override
+  protected void chore() {
+    if (this.assignmentManager.isRegionsInTransition()) {
+      LOG.debug("Not running balancer because regions in transition: " +
+        this.assignmentManager.getRegionsInTransition());
+      return;
+    }
+    Map<HServerInfo, List<HRegionInfo>> assignments =
+      this.assignmentManager.getAssignments();
+    List<RegionPlan> plans = balanceCluster(assignments);
+    if (plans == null || plans.isEmpty()) return;
+    for (RegionPlan plan: plans) {
+      this.assignmentManager.balance(plan);
+    }
   }
 
   /**
@@ -196,8 +217,7 @@ public class LoadBalancer {
       List<HRegionInfo> regions = server.getValue();
       int numToOffload = Math.min(regionCount - max, regions.size());
       for(int i=0; i<numToOffload; i++) {
-        regionsToMove.add(new RegionPlan(regions.get(i).getEncodedName(),
-            serverInfo, null));
+        regionsToMove.add(new RegionPlan(regions.get(i), serverInfo, null));
       }
       serverBalanceInfo.put(serverInfo,
           new BalanceInfo(numToOffload, (-1)*numToOffload));
@@ -250,8 +270,7 @@ public class LoadBalancer {
         int idx =
           balanceInfo == null ? 0 : balanceInfo.getNextRegionForUnload();
         HRegionInfo region = server.getValue().get(idx);
-        regionsToMove.add(new RegionPlan(region.getEncodedName(),
-            server.getKey(), null));
+        regionsToMove.add(new RegionPlan(region, server.getKey(), null));
         if(--neededRegions == 0) {
           // No more regions needed, done shedding
           break;
@@ -511,7 +530,7 @@ public class LoadBalancer {
   }
 
   public static HServerInfo randomAssignment(List<HServerInfo> servers) {
-    if(servers == null || servers.isEmpty()) {
+    if (servers == null || servers.isEmpty()) {
       LOG.warn("Wanted to do random assignment but no servers to assign to");
       return null;
     }
@@ -530,7 +549,7 @@ public class LoadBalancer {
    */
   public static class RegionPlan implements Comparable<RegionPlan> {
 
-    private final String regionName;
+    private final HRegionInfo hri;
     private final HServerInfo source;
     private HServerInfo dest;
 
@@ -541,12 +560,12 @@ public class LoadBalancer {
      * Destination server can be instantiated as null and later set
      * with {@link #setDestination(HServerInfo)}.
      *
-     * @param region region to be moved
+     * @param hri region to be moved
      * @param source regionserver region should be moved from
      * @param dest regionserver region should be moved to
      */
-    public RegionPlan(String regionName, HServerInfo source, HServerInfo dest) {
-      this.regionName = regionName;
+    public RegionPlan(final HRegionInfo hri, HServerInfo source, HServerInfo dest) {
+      this.hri = hri;
       this.source = source;
       this.dest = dest;
     }
@@ -579,7 +598,11 @@ public class LoadBalancer {
      * @return region name
      */
     public String getRegionName() {
-      return regionName;
+      return this.hri.getRegionNameAsString();
+    }
+ 
+    public HRegionInfo getRegionInfo() {
+      return this.hri;
     }
 
     /**
@@ -588,7 +611,7 @@ public class LoadBalancer {
      */
     @Override
     public int compareTo(RegionPlan o) {
-      return regionName.compareTo(o.getRegionName());
+      return getRegionName().compareTo(o.getRegionName());
     }
   }
-}
+}
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
Wed Aug 25 23:59:26 2010
@@ -269,8 +269,8 @@ public class ServerManager {
       LOG.info("Received " + msg);
       switch (msg.getType()) {
       case REGION_SPLIT:
-        // Nothing to do?
-        LOG.warn("TODO: update inmemory region state w/ split info " + msg);
+        this.services.getAssignmentManager().handleSplitReport(serverInfo,
+            msg.getRegionInfo(), msg.getDaughterA(), msg.getDaughterB());
         break;
 
         default:

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/master/handler/ServerShutdownHandler.java
Wed Aug 25 23:59:26 2010
@@ -27,10 +27,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaReader;
 import org.apache.hadoop.hbase.executor.EventHandler;
 import org.apache.hadoop.hbase.master.DeadServer;
 import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.zookeeper.KeeperException;
 
 
@@ -54,8 +56,10 @@ public class ServerShutdownHandler exten
 
   @Override
   public void process() throws IOException {
+    Pair<Boolean, Boolean> carryingCatalog = null;
     try {
-      this.server.getCatalogTracker().processServerShutdown(this.hsi);
+      carryingCatalog =
+        this.server.getCatalogTracker().processServerShutdown(this.hsi);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new IOException("Interrupted", e);
@@ -74,6 +78,27 @@ public class ServerShutdownHandler exten
     // splitting.
     this.services.getAssignmentManager().processServerShutdown(this.hsi);
 
+    // Assign root and meta if we were carrying them.
+    if (carryingCatalog.getFirst()) { // -ROOT-
+      try {
+        this.services.getAssignmentManager().assignRoot();
+      } catch (KeeperException e) {
+        this.server.abort("In server shutdown processing, assigning root", e);
+        throw new IOException("Aborting", e);
+      }
+    }
+    if (carryingCatalog.getSecond()) { // .META.
+      this.services.getAssignmentManager().assignMeta();
+    }
+
+    // Wait on meta to come online; we need it to progress.
+    try {
+      this.server.getCatalogTracker().waitForMeta();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new IOException("Interrupted", e);
+    }
+
     NavigableSet<HRegionInfo> hris =
       MetaReader.getServerRegions(this.server.getCatalogTracker(), this.hsi);
     LOG.info("Reassigning the " + hris.size() + " region(s) that " + serverName +

Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/util/HMerge.java
Wed Aug 25 23:59:26 2010
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
@@ -79,8 +80,32 @@ class HMerge {
   public static void merge(Configuration conf, FileSystem fs,
     final byte [] tableName)
   throws IOException {
-    HConnection connection = HConnectionManager.getConnection(conf);
-    boolean masterIsRunning = connection.isMasterRunning();
+    merge(conf, fs, tableName, true);
+  }
+
+  /**
+   * Scans the table and merges two adjacent regions if they are small. This
+   * only happens when a lot of rows are deleted.
+   *
+   * When merging the META region, the HBase instance must be offline.
+   * When merging a normal table, the HBase instance must be online, but the
+   * table must be disabled.
+   *
+   * @param conf        - configuration object for HBase
+   * @param fs          - FileSystem where regions reside
+   * @param tableName   - Table to be compacted
+   * @param testMasterRunning True if we are to verify master is down before
+   * running merge
+   * @throws IOException
+   */
+  public static void merge(Configuration conf, FileSystem fs,
+    final byte [] tableName, final boolean testMasterRunning)
+  throws IOException {
+    boolean masterIsRunning = false;
+    if (testMasterRunning) {
+      HConnection connection = HConnectionManager.getConnection(conf);
+      masterIsRunning = connection.isMasterRunning();
+    }
     HConnectionManager.deleteConnectionInfo(conf, false);
     if (Bytes.equals(tableName, HConstants.META_TABLE_NAME)) {
       if (masterIsRunning) {
@@ -93,6 +118,10 @@ class HMerge {
         throw new IllegalStateException(
             "HBase instance must be running to merge a normal table");
       }
+      HBaseAdmin admin = new HBaseAdmin(conf);
+      if (!admin.isTableDisabled(tableName)) {
+        throw new TableNotDisabledException(tableName);
+      }
       new OnlineMerger(conf, fs, tableName).process();
     }
   }
@@ -231,7 +260,6 @@ class HMerge {
         if (!Bytes.equals(region.getTableDesc().getName(), this.tableName)) {
           return null;
         }
-        checkOfflined(region);
         return region;
       } catch (IOException e) {
         e = RemoteExceptionHandler.checkIOException(e);
@@ -241,14 +269,6 @@ class HMerge {
       }
     }
 
-    protected void checkOfflined(final HRegionInfo hri)
-    throws TableNotDisabledException {
-      if (!hri.isOffline()) {
-        throw new TableNotDisabledException("Region " +
-          hri.getRegionNameAsString() + " is not disabled");
-      }
-    }
-
     /*
      * Check current row has a HRegionInfo.  Skip to next row if HRI is empty.
      * @return A Map of the row content else null if we are off the end.

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/TestMultiParallelPut.java
Wed Aug 25 23:59:26 2010
@@ -71,10 +71,7 @@ public class TestMultiParallelPut extend
   }
 
   public void doATest(boolean doAbort) throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    conf.set(HConstants.ZOOKEEPER_QUORUM, "localhost:" +
-        super.zooKeeperCluster.getClientPort());
-    HTable table = new HTable(TEST_TABLE);
+    HTable table = new HTable(conf, TEST_TABLE);
     table.setAutoFlush(false);
     table.setWriteBufferSize(10 * 1024 * 1024);
     for ( byte [] k : keys ) {

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestLoadBalancer.java
Wed Aug 25 23:59:26 2010
@@ -35,12 +35,11 @@ import java.util.TreeSet;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.master.LoadBalancer.RegionPlan;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.BeforeClass;
@@ -49,16 +48,21 @@ import org.junit.Test;
 public class TestLoadBalancer {
   private static final Log LOG = LogFactory.getLog(TestLoadBalancer.class);
 
-  private static Configuration conf;
-
   private static LoadBalancer loadBalancer;
 
   private static Random rand;
 
   @BeforeClass
   public static void beforeAllTests() throws Exception {
-    conf = HBaseConfiguration.create();
-    loadBalancer = new LoadBalancer(conf);
+    loadBalancer = new LoadBalancer("test", 1, new Stoppable() {
+      @Override
+      public void stop(String why) {
+      }
+      @Override
+      public boolean isStopped() {
+        return false;
+      }
+    }, (AssignmentManager)null);
     rand = new Random();
   }
 
@@ -209,7 +213,7 @@ public class TestLoadBalancer {
       List<HRegionInfo> regions = randomRegions(mock[0]);
       List<HServerInfo> servers = randomServers(mock[1], 0);
       Map<HRegionInfo,HServerInfo> assignments =
-        loadBalancer.immediateAssignment(regions, servers);
+        LoadBalancer.immediateAssignment(regions, servers);
       assertImmediateAssignment(regions, servers, assignments);
       returnRegions(regions);
       returnServers(servers);
@@ -244,7 +248,7 @@ public class TestLoadBalancer {
       List<HRegionInfo> regions = randomRegions(mock[0]);
       List<HServerInfo> servers = randomServers(mock[1], 0);
       Map<HServerInfo,List<HRegionInfo>> assignments =
-        loadBalancer.bulkAssignment(regions, servers);
+        LoadBalancer.bulkAssignment(regions, servers);
       float average = (float)regions.size()/servers.size();
       int min = (int)Math.floor(average);
       int max = (int)Math.ceil(average);

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/master/TestZKBasedOpenCloseRegion.java
Wed Aug 25 23:59:26 2010
@@ -100,9 +100,11 @@ public class TestZKBasedOpenCloseRegion 
     HRegionServer regionServer =
       TEST_UTIL.getHBaseCluster().getRegionServer(rsIdx);
     Collection<HRegionInfo> regions = regionServer.getOnlineRegions();
-    HRegionInfo hri;
-    while((hri = regions.iterator().next()) != null) {
-      if(!hri.isMetaRegion() && !hri.isRootRegion()) {
+    HRegionInfo hri = null;
+    for (HRegionInfo i: regions) {
+      LOG.info(i.getRegionNameAsString());
+      if (!i.isMetaRegion()) {
+        hri = i;
         break;
       }
     }

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeMeta.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeMeta.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeMeta.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeMeta.java
Wed Aug 25 23:59:26 2010
@@ -43,6 +43,6 @@ public class TestMergeMeta extends Abstr
    */
   public void testMergeMeta() throws IOException {
     assertNotNull(dfsCluster);
-    HMerge.merge(conf, dfsCluster.getFileSystem(), HConstants.META_TABLE_NAME);
+    HMerge.merge(conf, dfsCluster.getFileSystem(), HConstants.META_TABLE_NAME, false);
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java?rev=989407&r1=989406&r2=989407&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java
(original)
+++ hbase/branches/0.90_master_rewrite/src/test/java/org/apache/hadoop/hbase/util/TestMergeTable.java
Wed Aug 25 23:59:26 2010
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.AbstractMergeTestBase;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.util.HMerge;
 
 /**
@@ -35,6 +36,8 @@ public class TestMergeTable extends Abst
    */
   public void testMergeTable() throws IOException {
     assertNotNull(dfsCluster);
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    admin.disableTable(desc.getName());
     HMerge.merge(conf, dfsCluster.getFileSystem(), desc.getName());
   }
 }
\ No newline at end of file



Mime
View raw message