hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1097275 [4/8] - in /hbase/trunk: ./ src/docbkx/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/avro/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/o...
Date Wed, 27 Apr 2011 23:12:44 GMT
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java Wed Apr 27 23:12:42 2011
@@ -24,6 +24,7 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -43,11 +44,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.catalog.MetaReader;
@@ -124,8 +124,8 @@ public class AssignmentManager extends Z
    * with the other under a lock on {@link #regions}
    * @see #regions
    */
-  private final NavigableMap<HServerInfo, List<HRegionInfo>> servers =
-    new TreeMap<HServerInfo, List<HRegionInfo>>();
+  private final NavigableMap<ServerName, List<HRegionInfo>> servers =
+    new TreeMap<ServerName, List<HRegionInfo>>();
 
   /**
    * Region to server assignment map.
@@ -134,8 +134,8 @@ public class AssignmentManager extends Z
    * with the other under a lock on {@link #regions}
    * @see #servers
    */
-  private final SortedMap<HRegionInfo,HServerInfo> regions =
-    new TreeMap<HRegionInfo,HServerInfo>();
+  private final SortedMap<HRegionInfo, ServerName> regions =
+    new TreeMap<HRegionInfo, ServerName>();
 
   private final ExecutorService executorService;
 
@@ -169,6 +169,26 @@ public class AssignmentManager extends Z
   }
 
   /**
+   * Compute the average load across all region servers.
+   * Currently, this uses a very naive computation - just uses the number of
+   * regions being served, ignoring stats about number of requests.
+   * @return the average load
+   */
+  double getAverageLoad() {
+    int totalLoad = 0;
+    int numServers = 0;
+    // Sync on this.regions because access to this.servers always synchronizes
+    // in this order.
+    synchronized (this.regions) {
+      for (Map.Entry<ServerName, List<HRegionInfo>> e: servers.entrySet()) {
+        numServers++;
+        totalLoad += e.getValue().size();
+      }
+    }
+    return (double)totalLoad / (double)numServers;
+  }
+
+  /**
    * @return Instance of ZKTable.
    */
   public ZKTable getZKTable() {
@@ -191,33 +211,31 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Handle failover.  Restore state from META and ZK.  Handle any regions in
-   * transition.  Presumes <code>.META.</code> and <code>-ROOT-</code> deployed.
-   * @throws KeeperException
+   * Called on startup.
+   * Figures whether a fresh cluster start of we are joining extant running cluster.
    * @throws IOException
-   * @throws InterruptedException 
+   * @throws KeeperException
+   * @throws InterruptedException
    */
-  void processFailover() throws KeeperException, IOException, InterruptedException {
+  void joinCluster() throws IOException, KeeperException, InterruptedException {
     // Concurrency note: In the below the accesses on regionsInTransition are
     // outside of a synchronization block where usually all accesses to RIT are
     // synchronized.  The presumption is that in this case it is safe since this
     // method is being played by a single thread on startup.
 
-    // TODO: Check list of user regions and their assignments against regionservers.
     // TODO: Regions that have a null location and are not in regionsInTransitions
     // need to be handled.
 
     // Add -ROOT- and .META. on regions map.  They must be deployed if we got
-    // this far.  Caller takes care of it.
-    HServerInfo hsi =
-      this.serverManager.getHServerInfo(this.catalogTracker.getMetaLocation());
-    regionOnline(HRegionInfo.FIRST_META_REGIONINFO, hsi);
-    hsi = this.serverManager.getHServerInfo(this.catalogTracker.getRootLocation());
-    regionOnline(HRegionInfo.ROOT_REGIONINFO, hsi);
+    // this far.
+    ServerName sn = this.catalogTracker.getMetaLocation();
+    regionOnline(HRegionInfo.FIRST_META_REGIONINFO, sn);
+    sn = this.catalogTracker.getRootLocation();
+    regionOnline(HRegionInfo.ROOT_REGIONINFO, sn);
 
     // Scan META to build list of existing regions, servers, and assignment
     // Returns servers who have not checked in (assumed dead) and their regions
-    Map<HServerInfo,List<Pair<HRegionInfo,Result>>> deadServers =
+    Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers =
       rebuildUserRegions();
     // Process list of dead servers
     processDeadServers(deadServers);
@@ -227,15 +245,36 @@ public class AssignmentManager extends Z
 
   public void processRegionsInTransition() throws KeeperException, IOException {
     List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
-        watcher.assignmentZNode);
-    if (nodes.isEmpty()) {
-      LOG.info("No regions in transition in ZK to process on failover");
-      return;
+      watcher.assignmentZNode);
+    // Run through all regions.  If they are not assigned and not in RIT, then
+    // its a clean cluster startup, else its a failover.
+    boolean userRegionsOutOnCluster = false;
+    for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
+      if (!e.getKey().isMetaRegion() && e.getValue() != null) {
+        LOG.debug("Found " + e + " out on cluster");
+        userRegionsOutOnCluster = true;
+        break;
+      }
+      if (nodes.contains(e.getKey().getEncodedName())) {
+        LOG.debug("Found " + e + " in RITs");
+        userRegionsOutOnCluster = true;
+        break;
+      }
     }
-    LOG.info("Failed-over master needs to process " + nodes.size() +
-        " regions in transition");
-    for (String encodedRegionName: nodes) {
-      processRegionInTransition(encodedRegionName, null);
+
+    // If we found user regions out on cluster, its a failover.
+    if (userRegionsOutOnCluster) {
+      LOG.info("Found regions out on cluster or in RIT; failover");
+      processDeadServers(deadServers);
+      if (!nodes.isEmpty()) {
+        for (String encodedRegionName: nodes) {
+          processRegionInTransition(encodedRegionName, null);
+        }
+      }
+    } else {
+      // Fresh cluster startup.
+      cleanoutUnassigned();
+      assignAllUserRegions();
     }
   }
 
@@ -264,10 +303,10 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * Process failover of <code>encodedName</code>.  Look in
+   * Process failover of <code>servername</code>.  Look in RIT.
    * @param encodedRegionName Region to process failover for.
-   * @param encodedRegionName RegionInfo.  If null we'll go get it from meta table.
-   * @return
+   * @param regionInfo If null we'll go get it from meta table.
+   * @return True if we processed <code>regionInfo</code> as a RIT.
    * @throws KeeperException
    * @throws IOException
    */
@@ -278,7 +317,7 @@ public class AssignmentManager extends Z
     if (data == null) return false;
     HRegionInfo hri = regionInfo;
     if (hri == null) {
-      Pair<HRegionInfo, HServerAddress> p =
+      Pair<HRegionInfo, ServerName> p =
         MetaReader.getRegion(catalogTracker, data.getRegionName());
       if (p == null) return false;
       hri = p.getFirst();
@@ -327,17 +366,18 @@ public class AssignmentManager extends Z
         // Region is opened, insert into RIT and handle it
         regionsInTransition.put(encodedRegionName, new RegionState(
             regionInfo, RegionState.State.OPENING, data.getStamp()));
-        HServerInfo hsi = serverManager.getServerInfo(data.getServerName());
+        ServerName sn =
+          data.getOrigin() == null? null: data.getOrigin();
         // hsi could be null if this server is no longer online.  If
         // that the case, just let this RIT timeout; it'll be assigned
         // to new server then.
-        if (hsi == null) {
+        if (sn == null) {
           LOG.warn("Region in transition " + regionInfo.getEncodedName() +
-            " references a server no longer up " + data.getServerName() +
-            "; letting RIT timeout so will be assigned elsewhere");
+            " references a null server; letting RIT timeout so will be " +
+            "assigned elsewhere");
           break;
         }
-        new OpenedRegionHandler(master, this, regionInfo, hsi).process();
+        new OpenedRegionHandler(master, this, regionInfo, sn).process();
         break;
       }
     }
@@ -354,18 +394,19 @@ public class AssignmentManager extends Z
    */
   private void handleRegion(final RegionTransitionData data) {
     synchronized(regionsInTransition) {
-      if (data == null || data.getServerName() == null) {
+      if (data == null || data.getOrigin() == null) {
         LOG.warn("Unexpected NULL input " + data);
         return;
       }
+      ServerName sn = data.getOrigin();
       // Check if this is a special HBCK transition
-      if (data.getServerName().equals(HConstants.HBCK_CODE_NAME)) {
+      if (sn.equals(HConstants.HBCK_CODE_SERVERNAME)) {
         handleHBCK(data);
         return;
       }
       // Verify this is a known server
-      if (!serverManager.isServerOnline(data.getServerName()) &&
-          !this.master.getServerName().equals(data.getServerName())) {
+      if (!serverManager.isServerOnline(sn) &&
+          !this.master.getServerName().equals(sn)) {
         LOG.warn("Attempted to handle region transition for server but " +
           "server is not online: " + data.getRegionName());
         return;
@@ -387,7 +428,7 @@ public class AssignmentManager extends Z
 
         case RS_ZK_REGION_SPLITTING:
           if (!isInStateForSplitting(regionState)) break;
-          addSplittingToRIT(data.getServerName(), encodedName);
+          addSplittingToRIT(sn.toString(), encodedName);
           break;
 
         case RS_ZK_REGION_SPLIT:
@@ -396,9 +437,9 @@ public class AssignmentManager extends Z
           // If null, add SPLITTING state before going to SPLIT
           if (regionState == null) {
             LOG.info("Received SPLIT for region " + prettyPrintedRegionName +
-              " from server " + data.getServerName() +
+              " from server " + sn +
               " but region was not first in SPLITTING state; continuing");
-            addSplittingToRIT(data.getServerName(), encodedName);
+            addSplittingToRIT(sn.toString(), encodedName);
           }
           // Check it has daughters.
           byte [] payload = data.getPayload();
@@ -412,14 +453,13 @@ public class AssignmentManager extends Z
           }
           assert daughters.size() == 2;
           // Assert that we can get a serverinfo for this server.
-          HServerInfo hsi = getAndCheckHServerInfo(data.getServerName());
-          if (hsi == null) {
-            LOG.error("Dropped split! No HServerInfo for " + data.getServerName());
+          if (!this.serverManager.isServerOnline(sn)) {
+            LOG.error("Dropped split! ServerName=" + sn + " unknown.");
             break;
           }
           // Run handler to do the rest of the SPLIT handling.
           this.executorService.submit(new SplitRegionHandler(master, this,
-            regionState.getRegion(), hsi, daughters));
+            regionState.getRegion(), sn, daughters));
           break;
 
         case RS_ZK_REGION_CLOSING:
@@ -428,7 +468,7 @@ public class AssignmentManager extends Z
           if (regionState == null ||
               (!regionState.isPendingClose() && !regionState.isClosing())) {
             LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
-              " from server " + data.getServerName() + " but region was in " +
+              " from server " + data.getOrigin() + " but region was in " +
               " the state " + regionState + " and not " +
               "in expected PENDING_CLOSE or CLOSING states");
             return;
@@ -442,7 +482,7 @@ public class AssignmentManager extends Z
           if (regionState == null ||
               (!regionState.isPendingClose() && !regionState.isClosing())) {
             LOG.warn("Received CLOSED for region " + prettyPrintedRegionName +
-                " from server " + data.getServerName() + " but region was in " +
+                " from server " + data.getOrigin() + " but region was in " +
                 " the state " + regionState + " and not " +
                 "in expected PENDING_CLOSE or CLOSING states");
             return;
@@ -462,7 +502,7 @@ public class AssignmentManager extends Z
               (!regionState.isPendingOpen() && !regionState.isOpening())) {
             LOG.warn("Received OPENING for region " +
                 prettyPrintedRegionName +
-                " from server " + data.getServerName() + " but region was in " +
+                " from server " + data.getOrigin() + " but region was in " +
                 " the state " + regionState + " and not " +
                 "in expected PENDING_OPEN or OPENING states");
             return;
@@ -477,7 +517,7 @@ public class AssignmentManager extends Z
               (!regionState.isPendingOpen() && !regionState.isOpening())) {
             LOG.warn("Received OPENED for region " +
                 prettyPrintedRegionName +
-                " from server " + data.getServerName() + " but region was in " +
+                " from server " + data.getOrigin() + " but region was in " +
                 " the state " + regionState + " and not " +
                 "in expected PENDING_OPEN or OPENING states");
             return;
@@ -486,7 +526,7 @@ public class AssignmentManager extends Z
           regionState.update(RegionState.State.OPEN, data.getStamp());
           this.executorService.submit(
             new OpenedRegionHandler(master, this, regionState.getRegion(),
-              this.serverManager.getServerInfo(data.getServerName())));
+              data.getOrigin()));
           break;
       }
     }
@@ -524,12 +564,6 @@ public class AssignmentManager extends Z
     return true;
   }
 
-  private HServerInfo getAndCheckHServerInfo(final String serverName) {
-    HServerInfo hsi = this.serverManager.getServerInfo(serverName);
-    if (hsi == null) LOG.debug("No serverinfo for " + serverName);
-    return hsi;
-  }
-
   /**
    * @param serverName
    * @param encodedName
@@ -572,9 +606,9 @@ public class AssignmentManager extends Z
    */
   private HRegionInfo findHRegionInfo(final String serverName,
       final String encodedName) {
-    HServerInfo hsi = getAndCheckHServerInfo(serverName);
-    if (hsi == null) return null;
-    List<HRegionInfo> hris = this.servers.get(hsi);
+    ServerName sn = new ServerName(serverName);
+    if (!this.serverManager.isServerOnline(sn)) return null;
+    List<HRegionInfo> hris = this.servers.get(sn);
     HRegionInfo foundHri = null;
     for (HRegionInfo hri: hris) {
       if (hri.getEncodedName().equals(encodedName)) {
@@ -594,7 +628,7 @@ public class AssignmentManager extends Z
   private void handleHBCK(RegionTransitionData data) {
     String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
     LOG.info("Handling HBCK triggered transition=" + data.getEventType() +
-      ", server=" + data.getServerName() + ", region=" +
+      ", server=" + data.getOrigin() + ", region=" +
       HRegionInfo.prettyPrint(encodedName));
     RegionState regionState = regionsInTransition.get(encodedName);
     switch (data.getEventType()) {
@@ -741,9 +775,9 @@ public class AssignmentManager extends Z
    * <p>
    * Used when a region has been successfully opened on a region server.
    * @param regionInfo
-   * @param serverInfo
+   * @param sn
    */
-  public void regionOnline(HRegionInfo regionInfo, HServerInfo serverInfo) {
+  public void regionOnline(HRegionInfo regionInfo, ServerName sn) {
     synchronized (this.regionsInTransition) {
       RegionState rs =
         this.regionsInTransition.remove(regionInfo.getEncodedName());
@@ -753,22 +787,22 @@ public class AssignmentManager extends Z
     }
     synchronized (this.regions) {
       // Add check
-      HServerInfo hsi = this.regions.get(regionInfo);
-      if (hsi != null) LOG.warn("Overwriting " + regionInfo.getEncodedName() +
-        " on " + hsi);
-      this.regions.put(regionInfo, serverInfo);
-      addToServers(serverInfo, regionInfo);
+      ServerName oldSn = this.regions.get(regionInfo);
+      if (oldSn != null) LOG.warn("Overwriting " + regionInfo.getEncodedName() +
+        " on " + oldSn + " with " + sn);
+      this.regions.put(regionInfo, sn);
+      addToServers(sn, regionInfo);
       this.regions.notifyAll();
     }
     // Remove plan if one.
     clearRegionPlan(regionInfo);
     // Update timers for all regions in transition going against this server.
-    updateTimers(serverInfo);
+    updateTimers(sn);
   }
 
   /**
    * Touch timers for all regions in transition that have the passed
-   * <code>hsi</code> in common.
+   * <code>sn</code> in common.
    * Call this method whenever a server checks in.  Doing so helps the case where
    * a new regionserver has joined the cluster and its been given 1k regions to
    * open.  If this method is tickled every time the region reports in a
@@ -777,9 +811,9 @@ public class AssignmentManager extends Z
    * as part of bulk assign -- there we have a different mechanism for extending
    * the regions in transition timer (we turn it off temporarily -- because
    * there is no regionplan involved when bulk assigning.
-   * @param hsi
+   * @param sn
    */
-  private void updateTimers(final HServerInfo hsi) {
+  private void updateTimers(final ServerName sn) {
     // This loop could be expensive.
     // First make a copy of current regionPlan rather than hold sync while
     // looping because holding sync can cause deadlock.  Its ok in this loop
@@ -789,7 +823,7 @@ public class AssignmentManager extends Z
       copy.putAll(this.regionPlans);
     }
     for (Map.Entry<String, RegionPlan> e: copy.entrySet()) {
-      if (!e.getValue().getDestination().equals(hsi)) continue;
+      if (!e.getValue().getDestination().equals(sn)) continue;
       RegionState rs = null;
       synchronized (this.regionsInTransition) {
         rs = this.regionsInTransition.get(e.getKey());
@@ -828,11 +862,11 @@ public class AssignmentManager extends Z
    */
   public void setOffline(HRegionInfo regionInfo) {
     synchronized (this.regions) {
-      HServerInfo serverInfo = this.regions.remove(regionInfo);
-      if (serverInfo == null) return;
-      List<HRegionInfo> serverRegions = this.servers.get(serverInfo);
+      ServerName sn = this.regions.remove(regionInfo);
+      if (sn == null) return;
+      List<HRegionInfo> serverRegions = this.servers.get(sn);
       if (!serverRegions.remove(regionInfo)) {
-        LOG.warn("No " + regionInfo + " on " + serverInfo);
+        LOG.warn("No " + regionInfo + " on " + sn);
       }
     }
   }
@@ -906,10 +940,10 @@ public class AssignmentManager extends Z
    * @param destination
    * @param regions Regions to assign.
    */
-  void assign(final HServerInfo destination,
+  void assign(final ServerName destination,
       final List<HRegionInfo> regions) {
     LOG.debug("Bulk assigning " + regions.size() + " region(s) to " +
-      destination.getServerName());
+      destination.toString());
 
     List<RegionState> states = new ArrayList<RegionState>(regions.size());
     synchronized (this.regionsInTransition) {
@@ -932,14 +966,19 @@ public class AssignmentManager extends Z
     for (int oldCounter = 0; true;) {
       int count = counter.get();
       if (oldCounter != count) {
-        LOG.info(destination.getServerName() + " unassigned znodes=" + count +
+        LOG.info(destination.toString() + " unassigned znodes=" + count +
           " of total=" + total);
         oldCounter = count;
       }
       if (count == total) break;
       Threads.sleep(1);
     }
+    // Move on to open regions.
     try {
+      // Send OPEN RPC. This can fail if the server on other end is is not up.
+      // If we fail, fail the startup by aborting the server.  There is one
+      // exception we will tolerate: ServerNotRunningException.  This is thrown
+      // between report of regionserver being up and
       long maxWaitTime = System.currentTimeMillis() +
         this.master.getConfiguration().
           getLong("hbase.regionserver.rpc.startup.waittime", 60000);
@@ -962,7 +1001,7 @@ public class AssignmentManager extends Z
     } catch (InterruptedException e) {
       throw new RuntimeException(e);
     }
-    LOG.debug("Bulk assigning done for " + destination.getServerName());
+    LOG.debug("Bulk assigning done for " + destination.toString());
   }
 
   /**
@@ -971,11 +1010,11 @@ public class AssignmentManager extends Z
   static class CreateUnassignedAsyncCallback implements AsyncCallback.StringCallback {
     private final Log LOG = LogFactory.getLog(CreateUnassignedAsyncCallback.class);
     private final ZooKeeperWatcher zkw;
-    private final HServerInfo destination;
+    private final ServerName destination;
     private final AtomicInteger counter;
 
     CreateUnassignedAsyncCallback(final ZooKeeperWatcher zkw,
-        final HServerInfo destination, final AtomicInteger counter) {
+        final ServerName destination, final AtomicInteger counter) {
       this.zkw = zkw;
       this.destination = destination;
       this.counter = counter;
@@ -991,7 +1030,7 @@ public class AssignmentManager extends Z
           ", rc=" + rc, null);
         return;
       }
-      LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.getServerName());
+      LOG.debug("rs=" + (RegionState)ctx + ", server=" + this.destination.toString());
       // Async exists to set a watcher so we'll get triggered when
       // unassigned node changes.
       this.zkw.getZooKeeper().exists(path, this.zkw,
@@ -1078,7 +1117,7 @@ public class AssignmentManager extends Z
       if (plan == null) return; // Should get reassigned later when RIT times out.
       try {
         LOG.debug("Assigning region " + state.getRegion().getRegionNameAsString() +
-          " to " + plan.getDestination().getServerName());
+          " to " + plan.getDestination().toString());
         // Transition RegionState to PENDING_OPEN
         state.update(RegionState.State.PENDING_OPEN);
         // Send OPEN RPC. This can fail if the server on other end is is not up.
@@ -1118,7 +1157,7 @@ public class AssignmentManager extends Z
     state.update(RegionState.State.OFFLINE);
     try {
       if(!ZKAssign.createOrForceNodeOffline(master.getZooKeeper(),
-          state.getRegion(), master.getServerName())) {
+          state.getRegion(), this.master.getServerName())) {
         LOG.warn("Attempted to create/force node into OFFLINE state before " +
           "completing assignment but failed to do so for " + state);
         return false;
@@ -1147,7 +1186,7 @@ public class AssignmentManager extends Z
     state.update(RegionState.State.OFFLINE);
     try {
       ZKAssign.asyncCreateNodeOffline(master.getZooKeeper(), state.getRegion(),
-        master.getServerName(), cb, ctx);
+        this.master.getServerName(), cb, ctx);
     } catch (KeeperException e) {
       master.abort("Unexpected ZK exception creating/setting node OFFLINE", e);
       return false;
@@ -1175,10 +1214,10 @@ public class AssignmentManager extends Z
    * if no servers to assign, it returns null).
    */
   RegionPlan getRegionPlan(final RegionState state,
-      final HServerInfo serverToExclude, final boolean forceNewPlan) {
+      final ServerName serverToExclude, final boolean forceNewPlan) {
     // Pickup existing plan or make a new one
     String encodedName = state.getRegion().getEncodedName();
-    List<HServerInfo> servers = this.serverManager.getOnlineServersList();
+    List<ServerName> servers = this.serverManager.getOnlineServersList();
     // The remove below hinges on the fact that the call to
     // serverManager.getOnlineServersList() returns a copy
     if (serverToExclude != null) servers.remove(serverToExclude);
@@ -1266,7 +1305,7 @@ public class AssignmentManager extends Z
       }
     }
     // Send CLOSE RPC
-    HServerInfo server = null;
+    ServerName server = null;
     synchronized (this.regions) {
       server = regions.get(region);
     }
@@ -1347,6 +1386,29 @@ public class AssignmentManager extends Z
    * Assigns all user regions, if any.  Used during cluster startup.
    * <p>
    * This is a synchronous call and will return once every region has been
+   * assigned.  If anything fails, an exception is thrown
+   * @throws InterruptedException
+   * @throws IOException
+   */
+  public void assignUserRegions(List<HRegionInfo> regions, List<ServerName> servers)
+  throws IOException, InterruptedException {
+    if (regions == null)
+      return;
+    Map<ServerName, List<HRegionInfo>> bulkPlan = null;
+    // Generate a round-robin bulk assignment plan
+    bulkPlan = LoadBalancer.roundRobinAssignment(regions, servers);
+    LOG.info("Bulk assigning " + regions.size() + " region(s) round-robin across " +
+               servers.size() + " server(s)");
+    // Use fixed count thread pool assigning.
+    BulkAssigner ba = new StartupBulkAssigner(this.master, bulkPlan, this);
+    ba.bulkAssign();
+    LOG.info("Bulk assigning done");
+  }
+
+  /**
+   * Assigns all user regions, if any exist.  Used during cluster startup.
+   * <p>
+   * This is a synchronous call and will return once every region has been
    * assigned.  If anything fails, an exception is thrown and the cluster
    * should be shutdown.
    * @throws InterruptedException
@@ -1354,10 +1416,10 @@ public class AssignmentManager extends Z
    */
   public void assignAllUserRegions() throws IOException, InterruptedException {
     // Get all available servers
-    List<HServerInfo> servers = serverManager.getOnlineServersList();
+    List<ServerName> servers = serverManager.getOnlineServersList();
 
     // Scan META for all user regions, skipping any disabled tables
-    Map<HRegionInfo,HServerAddress> allRegions =
+    Map<HRegionInfo, ServerName> allRegions =
       MetaReader.fullScan(catalogTracker, this.zkTable.getDisabledTables(), true);
     if (allRegions == null || allRegions.isEmpty()) return;
 
@@ -1365,15 +1427,14 @@ public class AssignmentManager extends Z
     boolean retainAssignment = master.getConfiguration().
       getBoolean("hbase.master.startup.retainassign", true);
 
-    Map<HServerInfo, List<HRegionInfo>> bulkPlan = null;
+    Map<ServerName, List<HRegionInfo>> bulkPlan = null;
     if (retainAssignment) {
       // Reuse existing assignment info
       bulkPlan = LoadBalancer.retainAssignment(allRegions, servers);
     } else {
       // assign regions in round-robin fashion
-      HRegionInfo [] regions =
-        allRegions.keySet().toArray(new HRegionInfo[allRegions.size()]);
-      bulkPlan = LoadBalancer.roundRobinAssignment(regions, servers);
+      assignUserRegions(new ArrayList<HRegionInfo>(allRegions.keySet()), servers);
+      return;
     }
     LOG.info("Bulk assigning " + allRegions.size() + " region(s) across " +
       servers.size() + " server(s), retainAssignment=" + retainAssignment);
@@ -1391,11 +1452,11 @@ public class AssignmentManager extends Z
    * which will abort the Server if exception.
    */
   static class StartupBulkAssigner extends BulkAssigner {
-    final Map<HServerInfo, List<HRegionInfo>> bulkPlan;
+    final Map<ServerName, List<HRegionInfo>> bulkPlan;
     final AssignmentManager assignmentManager;
 
     StartupBulkAssigner(final Server server,
-        final Map<HServerInfo, List<HRegionInfo>> bulkPlan,
+        final Map<ServerName, List<HRegionInfo>> bulkPlan,
         final AssignmentManager am) {
       super(server);
       this.bulkPlan = bulkPlan;
@@ -1421,9 +1482,9 @@ public class AssignmentManager extends Z
 
     @Override
     protected void populatePool(java.util.concurrent.ExecutorService pool) {
-      for (Map.Entry<HServerInfo, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
+      for (Map.Entry<ServerName, List<HRegionInfo>> e: this.bulkPlan.entrySet()) {
         pool.execute(new SingleServerBulkAssigner(e.getKey(), e.getValue(),
-          this.assignmentManager, true));
+          this.assignmentManager));
       }
     }
 
@@ -1456,7 +1517,7 @@ public class AssignmentManager extends Z
    */
   static class GeneralBulkAssigner extends StartupBulkAssigner {
     GeneralBulkAssigner(final Server server,
-        final Map<HServerInfo, List<HRegionInfo>> bulkPlan,
+        final Map<ServerName, List<HRegionInfo>> bulkPlan,
         final AssignmentManager am) {
       super(server, bulkPlan, am);
     }
@@ -1476,13 +1537,12 @@ public class AssignmentManager extends Z
    * Manage bulk assigning to a server.
    */
   static class SingleServerBulkAssigner implements Runnable {
-    private final HServerInfo regionserver;
+    private final ServerName regionserver;
     private final List<HRegionInfo> regions;
     private final AssignmentManager assignmentManager;
 
-    SingleServerBulkAssigner(final HServerInfo regionserver,
-        final List<HRegionInfo> regions, final AssignmentManager am,
-        final boolean startUp) {
+    SingleServerBulkAssigner(final ServerName regionserver,
+        final List<HRegionInfo> regions, final AssignmentManager am) {
       this.regionserver = regionserver;
       this.regions = regions;
       this.assignmentManager = am;
@@ -1562,28 +1622,26 @@ public class AssignmentManager extends Z
    *         in META
    * @throws IOException
    */
-  private Map<HServerInfo,List<Pair<HRegionInfo,Result>>> rebuildUserRegions()
+  Map<ServerName, List<Pair<HRegionInfo, Result>>> rebuildUserRegions()
   throws IOException {
     // Region assignment from META
-    List<Result> results = MetaReader.fullScanOfResults(catalogTracker);
+    List<Result> results = MetaReader.fullScanOfResults(this.catalogTracker);
     // Map of offline servers and their regions to be returned
-    Map<HServerInfo,List<Pair<HRegionInfo,Result>>> offlineServers =
-      new TreeMap<HServerInfo,List<Pair<HRegionInfo,Result>>>();
+    Map<ServerName, List<Pair<HRegionInfo,Result>>> offlineServers =
+      new TreeMap<ServerName, List<Pair<HRegionInfo, Result>>>();
     // Iterate regions in META
     for (Result result : results) {
-      Pair<HRegionInfo,HServerInfo> region =
-        MetaReader.metaRowToRegionPairWithInfo(result);
+      Pair<HRegionInfo, ServerName> region = MetaReader.metaRowToRegionPair(result);
       if (region == null) continue;
-      HServerInfo regionLocation = region.getSecond();
       HRegionInfo regionInfo = region.getFirst();
+      ServerName regionLocation = region.getSecond();
       if (regionLocation == null) {
         // Region not being served, add to region map with no assignment
         // If this needs to be assigned out, it will also be in ZK as RIT
         this.regions.put(regionInfo, null);
-      } else if (!serverManager.isServerOnline(
-          regionLocation.getServerName())) {
+      } else if (!this.serverManager.isServerOnline(regionLocation)) {
         // Region is located on a server that isn't online
-        List<Pair<HRegionInfo,Result>> offlineRegions =
+        List<Pair<HRegionInfo, Result>> offlineRegions =
           offlineServers.get(regionLocation);
         if (offlineRegions == null) {
           offlineRegions = new ArrayList<Pair<HRegionInfo,Result>>(1);
@@ -1592,7 +1650,7 @@ public class AssignmentManager extends Z
         offlineRegions.add(new Pair<HRegionInfo,Result>(regionInfo, result));
       } else {
         // Region is being served and on an active server
-        regions.put(regionInfo, regionLocation);
+        this.regions.put(regionInfo, regionLocation);
         addToServers(regionLocation, regionInfo);
       }
     }
@@ -1613,9 +1671,9 @@ public class AssignmentManager extends Z
    * @throws KeeperException
    */
   private void processDeadServers(
-      Map<HServerInfo, List<Pair<HRegionInfo, Result>>> deadServers)
+      Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
   throws IOException, KeeperException {
-    for (Map.Entry<HServerInfo, List<Pair<HRegionInfo,Result>>> deadServer :
+    for (Map.Entry<ServerName, List<Pair<HRegionInfo,Result>>> deadServer:
       deadServers.entrySet()) {
       List<Pair<HRegionInfo,Result>> regions = deadServer.getValue();
       for (Pair<HRegionInfo,Result> region : regions) {
@@ -1624,7 +1682,7 @@ public class AssignmentManager extends Z
         // If region was in transition (was in zk) force it offline for reassign
         try {
           ZKAssign.createOrForceNodeOffline(watcher, regionInfo,
-              master.getServerName());
+            this.master.getServerName());
         } catch (KeeperException.NoNodeException nne) {
           // This is fine
         }
@@ -1640,11 +1698,11 @@ public class AssignmentManager extends Z
    * @param hsi
    * @param hri
    */
-  private void addToServers(final HServerInfo hsi, final HRegionInfo hri) {
-    List<HRegionInfo> hris = servers.get(hsi);
+  private void addToServers(final ServerName sn, final HRegionInfo hri) {
+    List<HRegionInfo> hris = servers.get(sn);
     if (hris == null) {
       hris = new ArrayList<HRegionInfo>();
-      servers.put(hsi, hris);
+      servers.put(sn, hris);
     }
     hris.add(hri);
   }
@@ -1857,7 +1915,7 @@ public class AssignmentManager extends Z
                   try {
                     data = new RegionTransitionData(
                       EventType.M_ZK_REGION_OFFLINE, regionInfo.getRegionName(),
-                      master.getServerName());
+                        master.getServerName());
                     if (ZKUtil.setData(watcher, node, data.getBytes(),
                         stat.getVersion())) {
                       // Node is now OFFLINE, let's trigger another assignment
@@ -1922,16 +1980,16 @@ public class AssignmentManager extends Z
 
   /**
    * Process shutdown server removing any assignments.
-   * @param hsi Server that went down.
+   * @param sn Server that went down.
    * @return list of regions in transition on this server
    */
-  public List<RegionState> processServerShutdown(final HServerInfo hsi) {
+  public List<RegionState> processServerShutdown(final ServerName sn) {
     // Clean out any existing assignment plans for this server
     synchronized (this.regionPlans) {
       for (Iterator <Map.Entry<String, RegionPlan>> i =
           this.regionPlans.entrySet().iterator(); i.hasNext();) {
         Map.Entry<String, RegionPlan> e = i.next();
-        if (e.getValue().getDestination().equals(hsi)) {
+        if (e.getValue().getDestination().equals(sn)) {
           // Use iterator's remove else we'll get CME
           i.remove();
         }
@@ -1943,7 +2001,7 @@ public class AssignmentManager extends Z
     Set<HRegionInfo> deadRegions = null;
     List<RegionState> rits = new ArrayList<RegionState>();
     synchronized (this.regions) {
-      List<HRegionInfo> assignedRegions = this.servers.remove(hsi);
+      List<HRegionInfo> assignedRegions = this.servers.remove(sn);
       if (assignedRegions == null || assignedRegions.isEmpty()) {
         // No regions on this server, we are done, return empty list of RITs
         return rits;
@@ -1968,16 +2026,16 @@ public class AssignmentManager extends Z
 
   /**
    * Update inmemory structures.
-   * @param hsi Server that reported the split
+   * @param sn Server that reported the split
    * @param parent Parent region that was split
    * @param a Daughter region A
    * @param b Daughter region B
    */
-  public void handleSplitReport(final HServerInfo hsi, final HRegionInfo parent,
+  public void handleSplitReport(final ServerName sn, final HRegionInfo parent,
       final HRegionInfo a, final HRegionInfo b) {
     regionOffline(parent);
-    regionOnline(a, hsi);
-    regionOnline(b, hsi);
+    regionOnline(a, sn);
+    regionOnline(b, sn);
 
     // There's a possibility that the region was splitting while a user asked
     // the master to disable, we need to make sure we close those regions in
@@ -1995,21 +2053,16 @@ public class AssignmentManager extends Z
    * If a new server has come in and it has no regions, it will not be included
    * in the returned Map.
    */
-  Map<HServerInfo, List<HRegionInfo>> getAssignments() {
+  Map<ServerName, List<HRegionInfo>> getAssignments() {
     // This is an EXPENSIVE clone.  Cloning though is the safest thing to do.
     // Can't let out original since it can change and at least the loadbalancer
     // wants to iterate this exported list.  We need to synchronize on regions
     // since all access to this.servers is under a lock on this.regions.
-    Map<HServerInfo, List<HRegionInfo>> result = null;
+    Map<ServerName, List<HRegionInfo>> result = null;
     synchronized (this.regions) {
-      result = new HashMap<HServerInfo, List<HRegionInfo>>(this.servers.size());
-      for (Map.Entry<HServerInfo, List<HRegionInfo>> e: this.servers.entrySet()) {
-        List<HRegionInfo> shallowCopy = new ArrayList<HRegionInfo>(e.getValue());
-        HServerInfo clone = new HServerInfo(e.getKey());
-        // Set into server load the number of regions this server is carrying
-        // The load balancer calculation needs it at least and its handy.
-        clone.getLoad().setNumberOfRegions(e.getValue().size());
-        result.put(clone, shallowCopy);
+      result = new HashMap<ServerName, List<HRegionInfo>>(this.servers.size());
+      for (Map.Entry<ServerName, List<HRegionInfo>> e: this.servers.entrySet()) {
+        result.put(e.getKey(), new ArrayList<HRegionInfo>(e.getValue()));
       }
     }
     return result;
@@ -2018,14 +2071,14 @@ public class AssignmentManager extends Z
   /**
    * @param encodedRegionName Region encoded name.
    * @return Null or a {@link Pair} instance that holds the full {@link HRegionInfo}
-   * and the hosting servers {@link HServerInfo}.
+   * and the hosting servers {@link ServerName}.
    */
-  Pair<HRegionInfo, HServerInfo> getAssignment(final byte [] encodedRegionName) {
+  Pair<HRegionInfo, ServerName> getAssignment(final byte [] encodedRegionName) {
     String name = Bytes.toString(encodedRegionName);
     synchronized(this.regions) {
-      for (Map.Entry<HRegionInfo, HServerInfo> e: this.regions.entrySet()) {
+      for (Map.Entry<HRegionInfo, ServerName> e: this.regions.entrySet()) {
         if (e.getKey().getEncodedName().equals(name)) {
-          return new Pair<HRegionInfo, HServerInfo>(e.getKey(), e.getValue());
+          return new Pair<HRegionInfo, ServerName>(e.getKey(), e.getValue());
         }
       }
     }
@@ -2043,28 +2096,12 @@ public class AssignmentManager extends Z
   }
 
   /**
-   * @param hsi
-   * @return True if this server is carrying a catalog region, a region from
-   * -ROOT- or .META. table.
-   */
-  boolean isMetaRegionServer(final HServerInfo hsi) {
-    synchronized (this.regions) {
-      List<HRegionInfo> regions = this.servers.get(hsi);
-      if (regions == null || regions.isEmpty()) return false;
-      for (HRegionInfo hri: regions) {
-        if (hri.isMetaRegion()) return true;
-      }
-    }
-    return false;
-  }
-
-  /**
    * Run through remaining regionservers and unassign all catalog regions.
    */
   void unassignCatalogRegions() {
     this.servers.entrySet();
     synchronized (this.regions) {
-      for (Map.Entry<HServerInfo, List<HRegionInfo>> e: this.servers.entrySet()) {
+      for (Map.Entry<ServerName, List<HRegionInfo>> e: this.servers.entrySet()) {
         List<HRegionInfo> regions = e.getValue();
         if (regions == null || regions.isEmpty()) continue;
         for (HRegionInfo hri: regions) {
@@ -2084,10 +2121,10 @@ public class AssignmentManager extends Z
    * @throws IOException
    */
   void bulkAssignUserRegions(final HRegionInfo [] regions,
-      final List<HServerInfo> servers, final boolean sync)
+      final List<ServerName> servers, final boolean sync)
   throws IOException {
-    Map<HServerInfo, List<HRegionInfo>> bulkPlan =
-      LoadBalancer.roundRobinAssignment(regions, servers);
+    Map<ServerName, List<HRegionInfo>> bulkPlan =
+      LoadBalancer.roundRobinAssignment(Arrays.asList(regions), servers);
     LOG.info("Bulk assigning " + regions.length + " region(s) " +
       "round-robin across " + servers.size() + " server(s)");
     // Use fixed count thread pool assigning.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/DeadServer.java Wed Apr 27 23:12:42 2011
@@ -22,17 +22,15 @@ package org.apache.hadoop.hbase.master;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang.NotImplementedException;
-import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.ServerName;
 
 /**
  * Class to hold dead servers list and utility querying dead server list.
  */
-public class DeadServer implements Set<String> {
+public class DeadServer implements Set<ServerName> {
   /**
    * Set of known dead servers.  On znode expiration, servers are added here.
    * This is needed in case of a network partitioning where the server's lease
@@ -40,26 +38,22 @@ public class DeadServer implements Set<S
    * and it's server logs are recovered, it will be told to call server startup
    * because by then, its regions have probably been reassigned.
    */
-  private final Set<String> deadServers = new HashSet<String>();
-
-  /** Maximum number of dead servers to keep track of */
-  private final int maxDeadServers;
+  private final Set<ServerName> deadServers = new HashSet<ServerName>();
 
   /** Number of dead servers currently being processed */
   private int numProcessing;
 
-  public DeadServer(int maxDeadServers) {
+  public DeadServer() {
     super();
-    this.maxDeadServers = maxDeadServers;
     this.numProcessing = 0;
   }
 
   /**
-   * @param serverName
+   * @param serverName Server name
    * @return true if server is dead
    */
   public boolean isDeadServer(final String serverName) {
-    return isDeadServer(serverName, false);
+    return isDeadServer(new ServerName(serverName));
   }
 
   /**
@@ -68,31 +62,27 @@ public class DeadServer implements Set<S
    * <code>host,port,startcode</code>.
    * @return true if this server was dead before and coming back alive again
    */
-  public boolean cleanPreviousInstance(final String newServerName) {
+  public boolean cleanPreviousInstance(final ServerName newServerName) {
+    ServerName sn =
+      ServerName.findServerWithSameHostnamePort(this.deadServers, newServerName);
+    if (sn == null) return false;
+    return this.deadServers.remove(sn);
+  }
 
-    String serverAddress =
-        HServerInfo.getServerNameLessStartCode(newServerName);
-    for (String serverName: deadServers) {
-      String deadServerAddress =
-          HServerInfo.getServerNameLessStartCode(serverName);
-      if (deadServerAddress.equals(serverAddress)) {
-        remove(serverName);
-        return true;
-      }
-    }
-    return false;
+  /**
+   * @param serverName
+   * @return true if this server is on the dead servers list.
+   */
+  boolean isDeadServer(final ServerName serverName) {
+    return this.deadServers.contains(serverName);
   }
 
   /**
-   * @param serverName Servername as either <code>host:port</code> or
-   * <code>host,port,startcode</code>.
-   * @param hostAndPortOnly True if <code>serverName</code> is host and
-   * port only (<code>host:port</code>) and if so, then we do a prefix compare
-   * (ignoring start codes) looking for dead server.
-   * @return true if server is dead
+   * @return True if we have a server with matching hostname and port.
    */
-  boolean isDeadServer(final String serverName, final boolean hostAndPortOnly) {
-    return HServerInfo.isServer(this, serverName, hostAndPortOnly);
+  boolean isDeadServerWithSameHostnamePort(final ServerName serverName) {
+    return ServerName.findServerWithSameHostnamePort(this.deadServers,
+      serverName) != null;
   }
 
   /**
@@ -105,18 +95,18 @@ public class DeadServer implements Set<S
     return numProcessing != 0;
   }
 
-  public synchronized Set<String> clone() {
-    Set<String> clone = new HashSet<String>(this.deadServers.size());
+  public synchronized Set<ServerName> clone() {
+    Set<ServerName> clone = new HashSet<ServerName>(this.deadServers.size());
     clone.addAll(this.deadServers);
     return clone;
   }
 
-  public synchronized boolean add(String e) {
+  public synchronized boolean add(ServerName e) {
     this.numProcessing++;
     return deadServers.add(e);
   }
 
-  public synchronized void finish(String e) {
+  public synchronized void finish(ServerName e) {
     this.numProcessing--;
   }
 
@@ -132,7 +122,7 @@ public class DeadServer implements Set<S
     return deadServers.contains(o);
   }
 
-  public Iterator<String> iterator() {
+  public Iterator<ServerName> iterator() {
     return this.deadServers.iterator();
   }
 
@@ -152,7 +142,7 @@ public class DeadServer implements Set<S
     return deadServers.containsAll(c);
   }
 
-  public synchronized boolean addAll(Collection<? extends String> c) {
+  public synchronized boolean addAll(Collection<? extends ServerName> c) {
     return deadServers.addAll(c);
   }
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1097275&r1=1097274&r2=1097275&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Wed Apr 27 23:12:42 2011
@@ -23,8 +23,8 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicReference;
@@ -36,14 +36,13 @@ import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HMsg;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.HServerLoad;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.MasterNotRunningException;
 import org.apache.hadoop.hbase.NotAllMetaRegionsOnlineException;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableExistsException;
 import org.apache.hadoop.hbase.TableNotDisabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
@@ -54,8 +53,8 @@ import org.apache.hadoop.hbase.catalog.M
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.MetaScanner;
-import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
@@ -131,8 +130,12 @@ implements HMasterInterface, HMasterRegi
 
   // RPC server for the HMaster
   private final RpcServer rpcServer;
-  // Address of the HMaster
-  private final HServerAddress address;
+
+  /**
+   * This servers address.
+   */
+  private final InetSocketAddress isa;
+
   // Metrics for the HMaster
   private final MasterMetrics metrics;
   // file system manager for the master FS operations
@@ -172,6 +175,7 @@ implements HMasterInterface, HMasterRegi
   private LogCleaner logCleaner;
 
   private MasterCoprocessorHost cpHost;
+  private final ServerName serverName;
 
   /**
    * Initializes the HMaster. The steps are as follows:
@@ -189,43 +193,48 @@ implements HMasterInterface, HMasterRegi
   throws IOException, KeeperException, InterruptedException {
     this.conf = conf;
 
-    /*
-     * Determine address and initialize RPC server (but do not start).
-     * The RPC server ports can be ephemeral. Create a ZKW instance.
-     */
-    HServerAddress a = new HServerAddress(getMyAddress(this.conf));
-    int numHandlers = conf.getInt("hbase.regionserver.handler.count", 10);
+    // Server to handle client requests.
+    String hostname = DNS.getDefaultHost(
+      conf.get("hbase.master.dns.interface", "default"),
+      conf.get("hbase.master.dns.nameserver", "default"));
+    int port = conf.getInt(HConstants.MASTER_PORT, HConstants.DEFAULT_MASTER_PORT);
+    // Creation of a HSA will force a resolve.
+    InetSocketAddress initialIsa = new InetSocketAddress(hostname, port);
+    if (initialIsa.getAddress() == null) {
+      throw new IllegalArgumentException("Failed resolve of " + this.isa);
+    }
+    int numHandlers = conf.getInt("hbase.master.handler.count",
+      conf.getInt("hbase.regionserver.handler.count", 25));
     this.rpcServer = HBaseRPC.getServer(this,
       new Class<?>[]{HMasterInterface.class, HMasterRegionInterface.class},
-      a.getBindAddress(), a.getPort(),
-      numHandlers,
-      0, // we dont use high priority handlers in master
-      false, conf,
-      0); // this is a DNC w/o high priority handlers
-    this.address = new HServerAddress(rpcServer.getListenerAddress());
+        initialIsa.getHostName(), // BindAddress is IP we got for this server.
+        initialIsa.getPort(),
+        numHandlers,
+        0, // we dont use high priority handlers in master
+        conf.getBoolean("hbase.rpc.verbose", false), conf,
+        0); // this is a DNC w/o high priority handlers
+    // Set our address.
+    this.isa = this.rpcServer.getListenerAddress();
+    this.serverName = new ServerName(this.isa.getHostName(),
+      this.isa.getPort(), System.currentTimeMillis());
 
     // initialize server principal (if using secure Hadoop)
     User.login(conf, "hbase.master.keytab.file",
-        "hbase.master.kerberos.principal", this.address.getHostname());
+      "hbase.master.kerberos.principal", this.isa.getHostName());
 
     // set the thread name now we have an address
-    setName(MASTER + "-" + this.address);
+    setName(MASTER + "-" + this.serverName.toString());
 
     Replication.decorateMasterConfiguration(this.conf);
-
     this.rpcServer.startThreads();
 
     // Hack! Maps DFSClient => Master for logs.  HDFS made this
     // config param for task trackers, but we can piggyback off of it.
     if (this.conf.get("mapred.task.id") == null) {
-      this.conf.set("mapred.task.id", "hb_m_" + this.address.toString() +
-        "_" + System.currentTimeMillis());
+      this.conf.set("mapred.task.id", "hb_m_" + this.serverName.toString());
     }
-
-    this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" +
-        address.getPort(), this);
-
-    this.metrics = new MasterMetrics(getServerName());
+    this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this);
+    this.metrics = new MasterMetrics(getServerName().toString());
   }
 
   /**
@@ -397,9 +406,9 @@ implements HMasterInterface, HMasterRegi
         fileSystemManager.getClusterId());
 
     this.connection = HConnectionManager.getConnection(conf);
-    this.executorService = new ExecutorService(getServerName());
+    this.executorService = new ExecutorService(getServerName().toString());
 
-    this.serverManager = new ServerManager(this, this, metrics);
+    this.serverManager = new ServerManager(this, this);
 
     initializeZKBasedSystemTrackers();
 
@@ -409,29 +418,25 @@ implements HMasterInterface, HMasterRegi
     // start up all service threads.
     startServiceThreads();
 
-    // Wait for region servers to report in.  Returns count of regions.
-    int regionCount = this.serverManager.waitForRegionServers();
+    // Wait for region servers to report in.
+    this.serverManager.waitForRegionServers();
+    // Check zk for regionservers that are up but didn't register
+    for (ServerName sn: this.regionServerTracker.getOnlineServers()) {
+      if (!this.serverManager.isServerOnline(sn)) {
+        // Not registered; add it.
+        LOG.info("Registering server found up in zk: " + sn);
+        this.serverManager.recordNewServer(sn, HServerLoad.EMPTY_HSERVERLOAD);
+      }
+    }
 
     // TODO: Should do this in background rather than block master startup
     this.fileSystemManager.
-      splitLogAfterStartup(this.serverManager.getOnlineServers());
+      splitLogAfterStartup(this.serverManager.getOnlineServers().keySet());
 
     // Make sure root and meta assigned before proceeding.
     assignRootAndMeta();
-
-    // Is this fresh start with no regions assigned or are we a master joining
-    // an already-running cluster?  If regionsCount == 0, then for sure a
-    // fresh start.  TOOD: Be fancier.  If regionsCount == 2, perhaps the
-    // 2 are .META. and -ROOT- and we should fall into the fresh startup
-    // branch below.  For now, do processFailover.
-    if (regionCount == 0) {
-      LOG.info("Master startup proceeding: cluster startup");
-      this.assignmentManager.cleanoutUnassigned();
-      this.assignmentManager.assignAllUserRegions();
-    } else {
-      LOG.info("Master startup proceeding: master failover");
-      this.assignmentManager.processFailover();
-    }
+    // Fixup assignment manager status
+    this.assignmentManager.joinCluster();
 
     // Start balancer and meta catalog janitor after meta and regions have
     // been assigned.
@@ -466,7 +471,7 @@ implements HMasterInterface, HMasterRegi
     } else {
       // Region already assigned.  We didnt' assign it.  Add to in-memory state.
       this.assignmentManager.regionOnline(HRegionInfo.ROOT_REGIONINFO,
-        this.serverManager.getHServerInfo(this.catalogTracker.getRootLocation()));
+        this.catalogTracker.getRootLocation());
     }
     LOG.info("-ROOT- assigned=" + assigned + ", rit=" + rit +
       ", location=" + catalogTracker.getRootLocation());
@@ -484,32 +489,13 @@ implements HMasterInterface, HMasterRegi
     } else {
       // Region already assigned.  We didnt' assign it.  Add to in-memory state.
       this.assignmentManager.regionOnline(HRegionInfo.FIRST_META_REGIONINFO,
-        this.serverManager.getHServerInfo(this.catalogTracker.getMetaLocation()));
+        this.catalogTracker.getMetaLocation());
     }
     LOG.info(".META. assigned=" + assigned + ", rit=" + rit +
       ", location=" + catalogTracker.getMetaLocation());
     return assigned;
   }
 
-  /*
-   * @return This masters' address.
-   * @throws UnknownHostException
-   */
-  private static String getMyAddress(final Configuration c)
-  throws UnknownHostException {
-    // Find out our address up in DNS.
-    String s = DNS.getDefaultHost(c.get("hbase.master.dns.interface","default"),
-      c.get("hbase.master.dns.nameserver","default"));
-    s += ":" + c.get(HConstants.MASTER_PORT,
-        Integer.toString(HConstants.DEFAULT_MASTER_PORT));
-    return s;
-  }
-
-  /** @return HServerAddress of the master server */
-  public HServerAddress getMasterAddress() {
-    return this.address;
-  }
-
   public long getProtocolVersion(String protocol, long clientVersion) {
     if (HMasterInterface.class.getName().equals(protocol)) {
       return HMasterInterface.VERSION;
@@ -645,25 +631,16 @@ implements HMasterInterface, HMasterRegi
   }
 
   @Override
-  public MapWritable regionServerStartup(final HServerInfo serverInfo,
-    final long serverCurrentTime)
+  public MapWritable regionServerStartup(final int port,
+    final long serverStartCode, final long serverCurrentTime)
   throws IOException {
-    // Set the ip into the passed in serverInfo.  Its ip is more than likely
-    // not the ip that the master sees here.  See at end of this method where
-    // we pass it back to the regionserver by setting "hbase.regionserver.address"
-    // Everafter, the HSI combination 'server name' is what uniquely identifies
-    // the incoming RegionServer.
-    InetSocketAddress address = new InetSocketAddress(
-        HBaseServer.getRemoteIp().getHostName(),
-        serverInfo.getServerAddress().getPort());
-    serverInfo.setServerAddress(new HServerAddress(address));
-
     // Register with server manager
-    this.serverManager.regionServerStartup(serverInfo, serverCurrentTime);
+    this.serverManager.regionServerStartup(HBaseServer.getRemoteIp(), port,
+      serverStartCode, serverCurrentTime);
     // Send back some config info
     MapWritable mw = createConfigurationSubset();
-     mw.put(new Text("hbase.regionserver.address"),
-         serverInfo.getServerAddress());
+    mw.put(new Text(HConstants.KEY_FOR_HOSTNAME_SEEN_BY_MASTER),
+      new Text(this.serverName.getHostname()));
     return mw;
   }
 
@@ -682,23 +659,13 @@ implements HMasterInterface, HMasterRegi
   }
 
   @Override
-  public HMsg [] regionServerReport(HServerInfo serverInfo, HMsg msgs[],
-    HRegionInfo[] mostLoadedRegions)
+  public void regionServerReport(byte[] sn, HServerLoad hsl)
   throws IOException {
-    return adornRegionServerAnswer(serverInfo,
-      this.serverManager.regionServerReport(serverInfo, msgs, mostLoadedRegions));
-  }
-
-  /**
-   * Override if you'd add messages to return to regionserver <code>hsi</code>
-   * or to send an exception.
-   * @param msgs Messages to add to
-   * @return Messages to return to
-   * @throws IOException exceptions that were injected for the region servers
-   */
-  protected HMsg [] adornRegionServerAnswer(final HServerInfo hsi,
-      final HMsg [] msgs) throws IOException {
-    return msgs;
+    this.serverManager.regionServerReport(new ServerName(sn), hsl);
+    if (hsl != null && this.metrics != null) {
+      // Up our metrics.
+      this.metrics.incrementRequests(hsl.getNumberOfRequests());
+    }
   }
 
   public boolean isMasterRunning() {
@@ -758,14 +725,13 @@ implements HMasterInterface, HMasterRegi
         }
       }
 
-      Map<HServerInfo, List<HRegionInfo>> assignments =
+      Map<ServerName, List<HRegionInfo>> assignments =
         this.assignmentManager.getAssignments();
       // Returned Map from AM does not include mention of servers w/o assignments.
-      for (Map.Entry<String, HServerInfo> e:
+      for (Map.Entry<ServerName, HServerLoad> e:
           this.serverManager.getOnlineServers().entrySet()) {
-        HServerInfo hsi = e.getValue();
-        if (!assignments.containsKey(hsi)) {
-          assignments.put(hsi, new ArrayList<HRegionInfo>());
+        if (!assignments.containsKey(e.getKey())) {
+          assignments.put(e.getKey(), new ArrayList<HRegionInfo>());
         }
       }
       List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
@@ -832,12 +798,12 @@ implements HMasterInterface, HMasterRegi
   @Override
   public void move(final byte[] encodedRegionName, final byte[] destServerName)
   throws UnknownRegionException {
-    Pair<HRegionInfo, HServerInfo> p =
+    Pair<HRegionInfo, ServerName> p =
       this.assignmentManager.getAssignment(encodedRegionName);
     if (p == null)
       throw new UnknownRegionException(Bytes.toString(encodedRegionName));
     HRegionInfo hri = p.getFirst();
-    HServerInfo dest = null;
+    ServerName dest = null;
     if (destServerName == null || destServerName.length == 0) {
       LOG.info("Passed destination servername is null/empty so " +
         "choosing a server at random");
@@ -845,12 +811,12 @@ implements HMasterInterface, HMasterRegi
       // Unassign will reassign it elsewhere choosing random server.
       this.assignmentManager.unassign(hri);
     } else {
-      dest = this.serverManager.getServerInfo(new String(destServerName));
-
+      dest = new ServerName(Bytes.toString(destServerName));
       if (this.cpHost != null) {
         this.cpHost.preMove(p.getFirst(), p.getSecond(), dest);
       }
       RegionPlan rp = new RegionPlan(p.getFirst(), p.getSecond(), dest);
+      LOG.info("Added move plan " + rp + ", running balancer");
       this.assignmentManager.balance(rp);
       if (this.cpHost != null) {
         this.cpHost.postMove(p.getFirst(), p.getSecond(), dest);
@@ -928,8 +894,13 @@ implements HMasterInterface, HMasterRegi
     }
 
     // 5. Trigger immediate assignment of the regions in round-robin fashion
-    List<HServerInfo> servers = serverManager.getOnlineServersList();
-    this.assignmentManager.bulkAssignUserRegions(newRegions, servers, sync);
+    List<ServerName> servers = serverManager.getOnlineServersList();
+    try {
+      this.assignmentManager.assignUserRegions(Arrays.asList(newRegions), servers);
+    } catch (InterruptedException ie) {
+      LOG.error("Caught " + ie + " during round-robin assignment");
+      throw new IOException(ie);
+    }
 
     // 6. If sync, wait for assignment of regions
     if (sync) {
@@ -1027,11 +998,11 @@ implements HMasterInterface, HMasterRegi
    * is found, but not currently deployed, the second element of the pair
    * may be null.
    */
-  Pair<HRegionInfo,HServerAddress> getTableRegionForRow(
+  Pair<HRegionInfo, ServerName> getTableRegionForRow(
       final byte [] tableName, final byte [] rowKey)
   throws IOException {
-    final AtomicReference<Pair<HRegionInfo, HServerAddress>> result =
-      new AtomicReference<Pair<HRegionInfo, HServerAddress>>(null);
+    final AtomicReference<Pair<HRegionInfo, ServerName>> result =
+      new AtomicReference<Pair<HRegionInfo, ServerName>>(null);
 
     MetaScannerVisitor visitor =
       new MetaScannerVisitor() {
@@ -1040,13 +1011,11 @@ implements HMasterInterface, HMasterRegi
           if (data == null || data.size() <= 0) {
             return true;
           }
-          Pair<HRegionInfo, HServerAddress> pair =
-            MetaReader.metaRowToRegionPair(data);
+          Pair<HRegionInfo, ServerName> pair = MetaReader.metaRowToRegionPair(data);
           if (pair == null) {
             return false;
           }
-          if (!Bytes.equals(pair.getFirst().getTableDesc().getName(),
-                tableName)) {
+          if (!Bytes.equals(pair.getFirst().getTableDesc().getName(), tableName)) {
             return false;
           }
           result.set(pair);
@@ -1095,13 +1064,11 @@ implements HMasterInterface, HMasterRegi
    * @return cluster status
    */
   public ClusterStatus getClusterStatus() {
-    ClusterStatus status = new ClusterStatus();
-    status.setHBaseVersion(VersionInfo.getVersion());
-    status.setServerInfo(serverManager.getOnlineServers().values());
-    status.setDeadServers(serverManager.getDeadServers());
-    status.setRegionsInTransition(assignmentManager.getRegionsInTransition());
-    status.setClusterId(fileSystemManager.getClusterId());
-    return status;
+    return new ClusterStatus(VersionInfo.getVersion(),
+      this.fileSystemManager.getClusterId(),
+      this.serverManager.getOnlineServers(),
+      this.serverManager.getDeadServers(),
+      this.assignmentManager.getRegionsInTransition());
   }
 
   public String getClusterId() {
@@ -1183,8 +1150,8 @@ implements HMasterInterface, HMasterRegi
   }
 
   @Override
-  public String getServerName() {
-    return address.toString();
+  public ServerName getServerName() {
+    return this.serverName;
   }
 
   @Override
@@ -1274,7 +1241,7 @@ implements HMasterInterface, HMasterRegi
         return;
       }
     }
-    Pair<HRegionInfo, HServerAddress> pair =
+    Pair<HRegionInfo, ServerName> pair =
       MetaReader.getRegion(this.catalogTracker, regionName);
     if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
     assignRegion(pair.getFirst());
@@ -1295,7 +1262,7 @@ implements HMasterInterface, HMasterRegi
         return;
       }
     }
-    Pair<HRegionInfo, HServerAddress> pair =
+    Pair<HRegionInfo, ServerName> pair =
       MetaReader.getRegion(this.catalogTracker, regionName);
     if (pair == null) throw new UnknownRegionException(Bytes.toString(regionName));
     HRegionInfo hri = pair.getFirst();
@@ -1307,6 +1274,16 @@ implements HMasterInterface, HMasterRegi
   }
 
   /**
+   * Compute the average load across all region servers.
+   * Currently, this uses a very naive computation - just uses the number of
+   * regions being served, ignoring stats about number of requests.
+   * @return the average load
+   */
+  public double getAverageLoad() {
+    return this.assignmentManager.getAverageLoad();
+  }
+
+  /**
    * Utility for constructing an instance of the passed HMaster class.
    * @param masterClass
    * @param conf
@@ -1331,7 +1308,6 @@ implements HMasterInterface, HMasterRegi
     }
   }
 
-
   /**
    * @see org.apache.hadoop.hbase.master.HMasterCommandLine
    */



Mime
View raw message