hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r991397 [6/15] - in /hbase/trunk: ./ bin/ conf/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/avro/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/or...
Date Tue, 31 Aug 2010 23:51:50 GMT
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,155 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Chore;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+
+/**
+ * This Chore, everytime it runs, will clear the wal logs in the old logs folder
+ * that are deletable for each log cleaner in the chain, in order to limit the
+ * number of deletes it sends, will only delete maximum 20 in a single run.
+ */
+public class LogCleaner extends Chore {
+  static final Log LOG = LogFactory.getLog(LogCleaner.class.getName());
+
+  // Max number we can delete on every chore, this is to make sure we don't
+  // issue thousands of delete commands around the same time
+  private final int maxDeletedLogs;
+  private final FileSystem fs;
+  private final Path oldLogDir;
+  private List<LogCleanerDelegate> logCleanersChain;
+  private final Configuration conf;
+
+  /**
+   *
+   * @param p the period of time to sleep between each run
+   * @param s the stopper
+   * @param conf configuration to use
+   * @param fs handle to the FS
+   * @param oldLogDir the path to the archived logs
+   */
+  public LogCleaner(final int p, final Stoppable s,
+                        Configuration conf, FileSystem fs,
+                        Path oldLogDir) {
+    super("LogsCleaner", p, s);
+
+    this.maxDeletedLogs =
+        conf.getInt("hbase.master.logcleaner.maxdeletedlogs", 20);
+    this.fs = fs;
+    this.oldLogDir = oldLogDir;
+    this.conf = conf;
+    this.logCleanersChain = new LinkedList<LogCleanerDelegate>();
+
+    initLogCleanersChain();
+  }
+
+  /*
+   * Initialize the chain of log cleaners from the configuration. The default
+   * three LogCleanerDelegates in this chain are: TimeToLiveLogCleaner,
+   * ReplicationLogCleaner and SnapshotLogCleaner.
+   */
+  private void initLogCleanersChain() {
+    String[] logCleaners = conf.getStrings("hbase.master.logcleaner.plugins");
+    if (logCleaners != null) {
+      for (String className : logCleaners) {
+        LogCleanerDelegate logCleaner = newLogCleaner(className, conf);
+        addLogCleaner(logCleaner);
+      }
+    }
+  }
+
+  /**
+   * A utility method to create new instances of LogCleanerDelegate based
+   * on the class name of the LogCleanerDelegate.
+   * @param className fully qualified class name of the LogCleanerDelegate
+   * @param conf
+   * @return the new instance
+   */
+  public static LogCleanerDelegate newLogCleaner(String className, Configuration conf) {
+    try {
+      Class c = Class.forName(className);
+      LogCleanerDelegate cleaner = (LogCleanerDelegate) c.newInstance();
+      cleaner.setConf(conf);
+      return cleaner;
+    } catch(Exception e) {
+      LOG.warn("Can NOT create LogCleanerDelegate: " + className, e);
+      // skipping if can't instantiate
+      return null;
+    }
+  }
+
+  /**
+   * Add a LogCleanerDelegate to the log cleaner chain. A log file is deletable
+   * if it is deletable for each LogCleanerDelegate in the chain.
+   * @param logCleaner
+   */
+  public void addLogCleaner(LogCleanerDelegate logCleaner) {
+    if (logCleaner != null && !logCleanersChain.contains(logCleaner)) {
+      logCleanersChain.add(logCleaner);
+      LOG.debug("Add log cleaner in chain: " + logCleaner.getClass().getName());
+    }
+  }
+
+  @Override
+  protected void chore() {
+    try {
+      FileStatus[] files = this.fs.listStatus(this.oldLogDir);
+      int nbDeletedLog = 0;
+      FILE: for (FileStatus file : files) {
+        Path filePath = file.getPath();
+        if (HLog.validateHLogFilename(filePath.getName())) {
+          for (LogCleanerDelegate logCleaner : logCleanersChain) {
+            if (!logCleaner.isLogDeletable(filePath) ) {
+              // this log is not deletable, continue to process next log file
+              continue FILE;
+            }
+          }
+          // delete this log file if it passes all the log cleaners
+          this.fs.delete(filePath, true);
+          nbDeletedLog++;
+        } else {
+          LOG.warn("Found a wrongly formated file: "
+              + file.getPath().getName());
+          this.fs.delete(filePath, true);
+          nbDeletedLog++;
+        }
+        if (nbDeletedLog >= maxDeletedLogs) {
+          break;
+        }
+      }
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      LOG.warn("Error while cleaning the logs", e);
+    }
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/LogCleanerDelegate.java Tue Aug 31 23:51:44 2010
@@ -45,5 +45,4 @@ public interface LogCleanerDelegate exte
    * @return true if the log is deletable, false if not
    */
   public boolean isLogDeletable(Path filePath);
-}
-
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,279 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+
+/**
+ * This class abstracts a bunch of operations the HMaster needs to interact with
+ * the underlying file system, including splitting log files, checking file
+ * system status, etc.
+ */
+public class MasterFileSystem {
+  private static final Log LOG = LogFactory.getLog(MasterFileSystem.class.getName());
+  // HBase configuration
+  Configuration conf;
+  // master status
+  Server master;
+  // Keep around for convenience.
+  private final FileSystem fs;
+  // Is the fileystem ok?
+  private volatile boolean fsOk = true;
+  // The Path to the old logs dir
+  private final Path oldLogDir;
+  // root hbase directory on the FS
+  private final Path rootdir;
+  // create the split log lock
+  final Lock splitLogLock = new ReentrantLock();
+
+  public MasterFileSystem(Server master) throws IOException {
+    this.conf = master.getConfiguration();
+    this.master = master;
+    // Set filesystem to be that of this.rootdir else we get complaints about
+    // mismatched filesystems if hbase.rootdir is hdfs and fs.defaultFS is
+    // default localfs.  Presumption is that rootdir is fully-qualified before
+    // we get to here with appropriate fs scheme.
+    this.rootdir = FSUtils.getRootDir(conf);
+    // Cover both bases, the old way of setting default fs and the new.
+    // We're supposed to run on 0.20 and 0.21 anyways.
+    conf.set("fs.default.name", this.rootdir.toString());
+    conf.set("fs.defaultFS", this.rootdir.toString());
+    // setup the filesystem variable
+    this.fs = FileSystem.get(conf);
+    // set up the archived logs path
+    this.oldLogDir = new Path(this.rootdir, HConstants.HREGION_OLDLOGDIR_NAME);
+  }
+
+  /**
+   * <ol>
+   * <li>Check if the root region exists and is readable, if not create it</li>
+   * <li>Create a log archive directory for RS to put archived logs</li>
+   * </ol>
+   */
+  public void initialize() throws IOException {
+    // check if the root directory exists
+    checkRootDir(this.rootdir, conf, this.fs);
+
+    // Make sure the region servers can archive their old logs
+    if(!this.fs.exists(this.oldLogDir)) {
+      this.fs.mkdirs(this.oldLogDir);
+    }
+  }
+
+  public FileSystem getFileSystem() {
+    return this.fs;
+  }
+
+  /**
+   * Get the directory where old logs go
+   * @return the dir
+   */
+  public Path getOldLogDir() {
+    return this.oldLogDir;
+  }
+
+  /**
+   * Checks to see if the file system is still accessible.
+   * If not, sets closed
+   * @return false if file system is not available
+   */
+  public boolean checkFileSystem() {
+    if (this.fsOk) {
+      try {
+        FSUtils.checkFileSystemAvailable(this.fs);
+      } catch (IOException e) {
+        master.abort("Shutting down HBase cluster: file system not available", e);
+        this.fsOk = false;
+      }
+    }
+    return this.fsOk;
+  }
+
+  /**
+   * @return HBase root dir.
+   * @throws IOException
+   */
+  public Path getRootDir() {
+    return this.rootdir;
+  }
+
+  /**
+   * Inspect the log directory to recover any log file without
+   * an active region server.
+   * @param onlineServers Map of online servers keyed by
+   * {@link HServerInfo#getServerName()}
+   */
+  void splitLogAfterStartup(final Map<String, HServerInfo> onlineServers) {
+    Path logsDirPath = new Path(this.rootdir, HConstants.HREGION_LOGDIR_NAME);
+    try {
+      if (!this.fs.exists(logsDirPath)) {
+        return;
+      }
+    } catch (IOException e) {
+      throw new RuntimeException("Failed exists test on " + logsDirPath, e);
+    }
+    FileStatus[] logFolders;
+    try {
+      logFolders = this.fs.listStatus(logsDirPath);
+    } catch (IOException e) {
+      throw new RuntimeException("Failed listing " + logsDirPath.toString(), e);
+    }
+    if (logFolders == null || logFolders.length == 0) {
+      LOG.debug("No log files to split, proceeding...");
+      return;
+    }
+    for (FileStatus status : logFolders) {
+      String serverName = status.getPath().getName();
+      LOG.info("Found log folder : " + serverName);
+      if(onlineServers.get(serverName) == null) {
+        LOG.info("Log folder doesn't belong " +
+          "to a known region server, splitting");
+        splitLog(serverName);
+      } else {
+        LOG.info("Log folder belongs to an existing region server");
+      }
+    }
+  }
+
+  public void splitLog(final String serverName) {
+    this.splitLogLock.lock();
+    Path logDir = new Path(this.rootdir, HLog.getHLogDirectoryName(serverName));
+    try {
+      HLog.splitLog(this.rootdir, logDir, oldLogDir, this.fs, conf);
+    } catch (IOException e) {
+      LOG.error("Failed splitting " + logDir.toString(), e);
+    } finally {
+      this.splitLogLock.unlock();
+    }
+  }
+
+  /**
+   * Get the rootdir.  Make sure its wholesome and exists before returning.
+   * @param rd
+   * @param conf
+   * @param fs
+   * @return hbase.rootdir (after checks for existence and bootstrapping if
+   * needed populating the directory with necessary bootup files).
+   * @throws IOException
+   */
+  private static Path checkRootDir(final Path rd, final Configuration c,
+    final FileSystem fs)
+  throws IOException {
+    // If FS is in safe mode wait till out of it.
+    FSUtils.waitOnSafeMode(c, c.getInt(HConstants.THREAD_WAKE_FREQUENCY,
+        10 * 1000));
+    // Filesystem is good. Go ahead and check for hbase.rootdir.
+    if (!fs.exists(rd)) {
+      fs.mkdirs(rd);
+      FSUtils.setVersion(fs, rd);
+    } else {
+      FSUtils.checkVersion(fs, rd, true);
+    }
+    // Make sure the root region directory exists!
+    if (!FSUtils.rootRegionExists(fs, rd)) {
+      bootstrap(rd, c);
+    }
+    return rd;
+  }
+
+  private static void bootstrap(final Path rd, final Configuration c)
+  throws IOException {
+    LOG.info("BOOTSTRAP: creating ROOT and first META regions");
+    try {
+      // Bootstrapping, make sure blockcache is off.  Else, one will be
+      // created here in bootstap and it'll need to be cleaned up.  Better to
+      // not make it in first place.  Turn off block caching for bootstrap.
+      // Enable after.
+      HRegionInfo rootHRI = new HRegionInfo(HRegionInfo.ROOT_REGIONINFO);
+      setInfoFamilyCaching(rootHRI, false);
+      HRegionInfo metaHRI = new HRegionInfo(HRegionInfo.FIRST_META_REGIONINFO);
+      setInfoFamilyCaching(metaHRI, false);
+      HRegion root = HRegion.createHRegion(rootHRI, rd, c);
+      HRegion meta = HRegion.createHRegion(metaHRI, rd, c);
+      setInfoFamilyCaching(rootHRI, true);
+      setInfoFamilyCaching(metaHRI, true);
+      // Add first region from the META table to the ROOT region.
+      HRegion.addRegionToMETA(root, meta);
+      root.close();
+      root.getLog().closeAndDelete();
+      meta.close();
+      meta.getLog().closeAndDelete();
+    } catch (IOException e) {
+      e = RemoteExceptionHandler.checkIOException(e);
+      LOG.error("bootstrap", e);
+      throw e;
+    }
+  }
+
+  /**
+   * @param hri Set all family block caching to <code>b</code>
+   * @param b
+   */
+  private static void setInfoFamilyCaching(final HRegionInfo hri, final boolean b) {
+    for (HColumnDescriptor hcd: hri.getTableDesc().families.values()) {
+      if (Bytes.equals(hcd.getName(), HConstants.CATALOG_FAMILY)) {
+        hcd.setBlockCacheEnabled(b);
+        hcd.setInMemory(b);
+      }
+    }
+  }
+
+  public void deleteRegion(HRegionInfo region) throws IOException {
+    fs.delete(HRegion.getRegionDir(rootdir, region), true);
+  }
+
+  public void deleteTable(byte[] tableName) throws IOException {
+    fs.delete(new Path(rootdir, Bytes.toString(tableName)), true);
+  }
+
+  public void updateRegionInfo(HRegionInfo region) {
+    // TODO implement this.  i think this is currently broken in trunk i don't
+    //      see this getting updated.
+    //      @see HRegion.checkRegioninfoOnFilesystem()
+  }
+
+  public void deleteFamily(HRegionInfo region, byte[] familyName)
+  throws IOException {
+    fs.delete(Store.getStoreHomedir(
+        new Path(rootdir, region.getTableDesc().getNameAsString()),
+        region.getEncodedName(), familyName), true);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.TableNotDisabledException;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.executor.ExecutorService;
+
+/**
+ * Services Master supplies
+ */
+public interface MasterServices {
+  /**
+   * @return Master's instance of the {@link AssignmentManager}
+   */
+  public AssignmentManager getAssignmentManager();
+
+  /**
+   * @return Master's filesystem {@link MasterFileSystem} utility class.
+   */
+  public MasterFileSystem getMasterFileSystem();
+
+  /**
+   * @return Master's {@link ServerManager} instance.
+   */
+  public ServerManager getServerManager();
+
+  /**
+   * @return Master's instance of {@link ExecutorService}
+   */
+  public ExecutorService getExecutorService();
+
+  /**
+   * Check table is modifiable; i.e. exists and is offline.
+   * @param tableName Name of table to check.
+   * @throws TableNotDisabledException
+   * @throws TableNotFoundException 
+   */
+  public void checkTableModifiable(final byte [] tableName) throws IOException;
+}

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java?rev=991397&r1=991396&r2=991397&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java Tue Aug 31 23:51:44 2010
@@ -19,144 +19,127 @@
  */
 package org.apache.hadoop.hbase.master;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Chore;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HMsg;
 import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HServerInfo;
 import org.apache.hadoop.hbase.HServerLoad;
+import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.PleaseHoldException;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.YouAreDeadException;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ServerConnection;
+import org.apache.hadoop.hbase.client.ServerConnectionManager;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
-import org.apache.hadoop.hbase.master.RegionManager.RegionState;
-import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
+import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
+import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.util.Threads;
-import org.apache.zookeeper.WatchedEvent;
-import org.apache.zookeeper.Watcher;
-import org.apache.zookeeper.Watcher.Event.EventType;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * The ServerManager class manages info about region servers - HServerInfo,
  * load numbers, dying servers, etc.
+ * <p>
+ * Maintains lists of online and dead servers.  Processes the startups,
+ * shutdowns, and deaths of region servers.
+ * <p>
+ * Servers are distinguished in two different ways.  A given server has a
+ * location, specified by hostname and port, and of which there can only be one
+ * online at any given time.  A server instance is specified by the location
+ * (hostname and port) as well as the startcode (timestamp from when the server
+ * was started).  This is used to differentiate a restarted instance of a given
+ * server from the original instance.
  */
 public class ServerManager {
-  private static final Log LOG =
-    LogFactory.getLog(ServerManager.class.getName());
+  private static final Log LOG = LogFactory.getLog(ServerManager.class);
 
-  private final AtomicInteger quiescedServers = new AtomicInteger(0);
+  // Set if we are to shutdown the cluster.
+  private volatile boolean clusterShutdown = false;
 
-  // The map of known server names to server info
-  private final Map<String, HServerInfo> serversToServerInfo =
+  /** The map of known server names to server info */
+  private final Map<String, HServerInfo> onlineServers =
     new ConcurrentHashMap<String, HServerInfo>();
 
-  /*
-   * Set of known dead servers.  On znode expiration, servers are added here.
-   * This is needed in case of a network partitioning where the server's lease
-   * expires, but the server is still running. After the network is healed,
-   * and it's server logs are recovered, it will be told to call server startup
-   * because by then, its regions have probably been reassigned.
-   */
-  private final Set<String> deadServers =
-    Collections.synchronizedSet(new HashSet<String>());
-
-  // SortedMap server load -> Set of server names
-  private final SortedMap<HServerLoad, Set<String>> loadToServers =
-    Collections.synchronizedSortedMap(new TreeMap<HServerLoad, Set<String>>());
-  // Map of server names -> server load
-  private final Map<String, HServerLoad> serversToLoad =
-    new ConcurrentHashMap<String, HServerLoad>();
-
-  private HMaster master;
-
-  /* The regionserver will not be assigned or asked close regions if it
-   * is currently opening >= this many regions.
+  // TODO: This is strange to have two maps but HSI above is used on both sides
+  /**
+   * Map from full server-instance name to the RPC connection for this server.
    */
-  private final int nobalancingCount;
+  private final Map<String, HRegionInterface> serverConnections =
+    new HashMap<String, HRegionInterface>();
+
+  private final Server master;
+  private final MasterServices services;
 
   private final ServerMonitor serverMonitorThread;
 
   private int minimumServerCount;
 
-  private final LogsCleaner logCleaner;
+  private final LogCleaner logCleaner;
 
-  /*
+  // Reporting to track master metrics.
+  private final MasterMetrics metrics;
+
+  private final DeadServer deadservers = new DeadServer();
+
+  /**
    * Dumps into log current stats on dead servers and number of servers
    * TODO: Make this a metric; dump metrics into log.
    */
   class ServerMonitor extends Chore {
-    ServerMonitor(final int period, final AtomicBoolean stop) {
-      super("ServerMonitor", period, stop);
+    ServerMonitor(final int period, final Stoppable stopper) {
+      super("ServerMonitor", period, stopper);
     }
 
     @Override
     protected void chore() {
-      int numServers = serversToServerInfo.size();
-      int numDeadServers = deadServers.size();
+      int numServers = numServers();
+      int numDeadServers = deadservers.size();
       double averageLoad = getAverageLoad();
-      String deadServersList = null;
-      if (numDeadServers > 0) {
-        StringBuilder sb = new StringBuilder("Dead Server [");
-        boolean first = true;
-        synchronized (deadServers) {
-          for (String server: deadServers) {
-            if (!first) {
-              sb.append(",  ");
-              first = false;
-            }
-            sb.append(server);
-          }
-        }
-        sb.append("]");
-        deadServersList = sb.toString();
-      }
+      String deadServersList = deadservers.toString();
       LOG.info(numServers + " region servers, " + numDeadServers +
         " dead, average load " + averageLoad +
-        (deadServersList != null? deadServers: ""));
+        ((deadServersList != null && deadServersList.length() > 0)?
+          deadServersList: ""));
     }
   }
 
   /**
    * Constructor.
    * @param master
+   * @param services
    */
-  public ServerManager(HMaster master) {
+  public ServerManager(final Server master, final MasterServices services) {
     this.master = master;
+    this.services = services;
     Configuration c = master.getConfiguration();
-    this.nobalancingCount = c.getInt("hbase.regions.nobalancing.count", 4);
     int metaRescanInterval = c.getInt("hbase.master.meta.thread.rescanfrequency",
       60 * 1000);
-    this.minimumServerCount = c.getInt("hbase.regions.server.count.min", 0);
-    this.serverMonitorThread = new ServerMonitor(metaRescanInterval,
-      this.master.getShutdownRequested());
+    this.minimumServerCount = c.getInt("hbase.regions.server.count.min", 1);
+    this.metrics = new MasterMetrics(master.getServerName());
+    this.serverMonitorThread = new ServerMonitor(metaRescanInterval, master);
     String n = Thread.currentThread().getName();
     Threads.setDaemonThreadRunning(this.serverMonitorThread,
       n + ".serverMonitor");
-    this.logCleaner = new LogsCleaner(
+    this.logCleaner = new LogCleaner(
       c.getInt("hbase.master.meta.thread.rescanfrequency",60 * 1000),
-        this.master.getShutdownRequested(), c,
-        master.getFileSystem(), master.getOldLogDir());
+      master, c, this.services.getMasterFileSystem().getFileSystem(),
+      this.services.getMasterFileSystem().getOldLogDir());
     Threads.setDaemonThreadRunning(logCleaner,
       n + ".oldLogCleaner");
-
   }
 
   /**
@@ -175,7 +158,8 @@ public class ServerManager {
     // for processing by ProcessServerShutdown.
     HServerInfo info = new HServerInfo(serverInfo);
     String hostAndPort = info.getServerAddress().toString();
-    HServerInfo existingServer = haveServerWithSameHostAndPortAlready(info.getHostnamePort());
+    HServerInfo existingServer =
+      haveServerWithSameHostAndPortAlready(info.getHostnamePort());
     if (existingServer != null) {
       String message = "Server start rejected; we already have " + hostAndPort +
         " registered; existingServer=" + existingServer + ", newServer=" + info;
@@ -192,8 +176,8 @@ public class ServerManager {
   }
 
   private HServerInfo haveServerWithSameHostAndPortAlready(final String hostnamePort) {
-    synchronized (this.serversToServerInfo) {
-      for (Map.Entry<String, HServerInfo> e: this.serversToServerInfo.entrySet()) {
+    synchronized (this.onlineServers) {
+      for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
         if (e.getValue().getHostnamePort().equals(hostnamePort)) {
           return e.getValue();
         }
@@ -202,7 +186,7 @@ public class ServerManager {
     return null;
   }
 
-  /*
+  /**
    * If this server is on the dead list, reject it with a LeaseStillHeldException
    * @param serverName Server name formatted as host_port_startcode.
    * @param what START or REPORT
@@ -210,7 +194,7 @@ public class ServerManager {
    */
   private void checkIsDead(final String serverName, final String what)
   throws YouAreDeadException {
-    if (!isDead(serverName)) return;
+    if (!this.deadservers.isDeadServer(serverName)) return;
     String message = "Server " + what + " rejected; currently processing " +
       serverName + " as dead server";
     LOG.debug(message);
@@ -222,7 +206,7 @@ public class ServerManager {
    * @param info The region server informations
    */
   public void recordNewServer(HServerInfo info) {
-    recordNewServer(info, false);
+    recordNewServer(info, false, null);
   }
 
   /**
@@ -231,23 +215,18 @@ public class ServerManager {
    * @param useInfoLoad True if the load from the info should be used
    *                    like under a master failover
    */
-  void recordNewServer(HServerInfo info, boolean useInfoLoad) {
+  void recordNewServer(HServerInfo info, boolean useInfoLoad,
+      HRegionInterface hri) {
     HServerLoad load = useInfoLoad ? info.getLoad() : new HServerLoad();
     String serverName = info.getServerName();
     info.setLoad(load);
-    // We must set this watcher here because it can be set on a fresh start
-    // or on a failover
-    Watcher watcher = new ServerExpirer(new HServerInfo(info));
-    this.master.getZooKeeperWrapper().updateRSLocationGetWatch(info, watcher);
-    this.serversToServerInfo.put(serverName, info);
-    this.serversToLoad.put(serverName, load);
-    synchronized (this.loadToServers) {
-      Set<String> servers = this.loadToServers.get(load);
-      if (servers == null) {
-        servers = new HashSet<String>();
-      }
-      servers.add(serverName);
-      this.loadToServers.put(load, servers);
+    // TODO: Why did we update the RS location ourself?  Shouldn't RS do this?
+    // masterStatus.getZooKeeper().updateRSLocationGetWatch(info, watcher);
+    onlineServers.put(serverName, info);
+    if(hri == null) {
+      serverConnections.remove(serverName);
+    } else {
+      serverConnections.put(serverName, hri);
     }
   }
 
@@ -265,130 +244,76 @@ public class ServerManager {
    * @throws IOException
    */
   HMsg [] regionServerReport(final HServerInfo serverInfo,
-    final HMsg msgs[], final HRegionInfo[] mostLoadedRegions)
+    final HMsg [] msgs, final HRegionInfo[] mostLoadedRegions)
   throws IOException {
+    // Be careful. This method does returns in the middle.
     HServerInfo info = new HServerInfo(serverInfo);
+
+    // Check if dead.  If it is, it'll get a 'You Are Dead!' exception.
     checkIsDead(info.getServerName(), "REPORT");
-    if (msgs.length > 0) {
-      if (msgs[0].isType(HMsg.Type.MSG_REPORT_EXITING)) {
-        processRegionServerExit(info, msgs);
-        return HMsg.EMPTY_HMSG_ARRAY;
-      } else if (msgs[0].isType(HMsg.Type.MSG_REPORT_QUIESCED)) {
-        LOG.info("Region server " + info.getServerName() + " quiesced");
-        this.quiescedServers.incrementAndGet();
-      }
-    }
-    if (this.master.getShutdownRequested().get()) {
-      if (quiescedServers.get() >= serversToServerInfo.size()) {
-        // If the only servers we know about are meta servers, then we can
-        // proceed with shutdown
-        LOG.info("All user tables quiesced. Proceeding with shutdown");
-        this.master.startShutdown();
-      }
-      if (!this.master.isClosed()) {
-        if (msgs.length > 0 &&
-            msgs[0].isType(HMsg.Type.MSG_REPORT_QUIESCED)) {
-          // Server is already quiesced, but we aren't ready to shut down
-          // return empty response
-          return HMsg.EMPTY_HMSG_ARRAY;
-        }
-        // Tell the server to stop serving any user regions
-        return new HMsg [] {HMsg.REGIONSERVER_QUIESCE};
-      }
-    }
-    if (this.master.isClosed()) {
-      // Tell server to shut down if we are shutting down.  This should
-      // happen after check of MSG_REPORT_EXITING above, since region server
-      // will send us one of these messages after it gets MSG_REGIONSERVER_STOP
-      return new HMsg [] {HMsg.REGIONSERVER_STOP};
-    }
 
-    HServerInfo storedInfo = this.serversToServerInfo.get(info.getServerName());
+    // If we don't know this server, tell it shutdown.
+    HServerInfo storedInfo = this.onlineServers.get(info.getServerName());
     if (storedInfo == null) {
       LOG.warn("Received report from unknown server -- telling it " +
-        "to " + HMsg.REGIONSERVER_STOP + ": " + info.getServerName());
-      // The HBaseMaster may have been restarted.
-      // Tell the RegionServer to abort!
-      return new HMsg[] {HMsg.REGIONSERVER_STOP};
-    } else if (storedInfo.getStartCode() != info.getStartCode()) {
-      // This state is reachable if:
-      //
-      // 1) RegionServer A started
-      // 2) RegionServer B started on the same machine, then
-      //    clobbered A in regionServerStartup.
-      // 3) RegionServer A returns, expecting to work as usual.
-      //
-      // The answer is to ask A to shut down for good.
+        "to " + HMsg.Type.STOP_REGIONSERVER + ": " + info.getServerName());
+      return HMsg.STOP_REGIONSERVER_ARRAY;
+    }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("region server race condition detected: " +
-            info.getServerName());
-      }
+    // Check startcodes
+    if (raceThatShouldNotHappenAnymore(storedInfo, info)) {
+      return HMsg.STOP_REGIONSERVER_ARRAY;
+    }
+
+    for (HMsg msg: msgs) {
+      LOG.info("Received " + msg);
+      switch (msg.getType()) {
+      case REGION_SPLIT:
+        this.services.getAssignmentManager().handleSplitReport(serverInfo,
+            msg.getRegionInfo(), msg.getDaughterA(), msg.getDaughterB());
+        break;
 
-      synchronized (this.serversToServerInfo) {
-        removeServerInfo(info.getServerName());
-        notifyServers();
+        default:
+          LOG.error("Unhandled msg type " + msg);
       }
+    }
 
-      return new HMsg[] {HMsg.REGIONSERVER_STOP};
-    } else {
-      return processRegionServerAllsWell(info, mostLoadedRegions, msgs);
+    HMsg [] reply = null;
+    int numservers = numServers();
+    if (this.clusterShutdown) {
+      if (numservers <= 2) {
+        // Shutdown needs to be staggered; the meta regions need to close last
+        // in case they need to be updated during the close melee.  If <= 2
+        // servers left, then these are the two that were carrying root and meta
+        // most likely (TODO: This presumes unsplittable meta -- FIX). Tell
+        // these servers can shutdown now too.
+        reply = HMsg.STOP_REGIONSERVER_ARRAY;
+      }
     }
+    return processRegionServerAllsWell(info, mostLoadedRegions, reply);
   }
 
-  /*
-   * Region server is exiting with a clean shutdown.
-   *
-   * In this case, the server sends MSG_REPORT_EXITING in msgs[0] followed by
-   * a MSG_REPORT_CLOSE for each region it was serving.
-   * @param serverInfo
-   * @param msgs
-   */
-  private void processRegionServerExit(HServerInfo serverInfo, HMsg[] msgs) {
-    synchronized (this.serversToServerInfo) {
-      // This method removes ROOT/META from the list and marks them to be
-      // reassigned in addition to other housework.
-      if (removeServerInfo(serverInfo.getServerName())) {
-        // Only process the exit message if the server still has registered info.
-        // Otherwise we could end up processing the server exit twice.
-        LOG.info("Region server " + serverInfo.getServerName() +
-          ": MSG_REPORT_EXITING");
-        // Get all the regions the server was serving reassigned
-        // (if we are not shutting down).
-        if (!master.closed.get()) {
-          for (int i = 1; i < msgs.length; i++) {
-            LOG.info("Processing " + msgs[i] + " from " +
-              serverInfo.getServerName());
-            assert msgs[i].getType() == HMsg.Type.MSG_REGION_CLOSE;
-            HRegionInfo info = msgs[i].getRegionInfo();
-            // Meta/root region offlining is handed in removeServerInfo above.
-            if (!info.isMetaRegion()) {
-              synchronized (master.getRegionManager()) {
-                if (!master.getRegionManager().isOfflined(info.getRegionNameAsString())) {
-                  master.getRegionManager().setUnassigned(info, true);
-                } else {
-                  master.getRegionManager().removeRegion(info);
-                }
-              }
-            }
-          }
-        }
-        // There should not be any regions in transition for this server - the
-        // server should finish transitions itself before closing
-        Map<String, RegionState> inTransition = master.getRegionManager()
-            .getRegionsInTransitionOnServer(serverInfo.getServerName());
-        for (Map.Entry<String, RegionState> entry : inTransition.entrySet()) {
-          LOG.warn("Region server " + serverInfo.getServerName()
-              + " shut down with region " + entry.getKey() + " in transition "
-              + "state " + entry.getValue());
-          master.getRegionManager().setUnassigned(entry.getValue().getRegionInfo(),
-              true);
-        }
+  private boolean raceThatShouldNotHappenAnymore(final HServerInfo storedInfo,
+      final HServerInfo reportedInfo) {
+    if (storedInfo.getStartCode() != reportedInfo.getStartCode()) {
+      // TODO: I don't think this possible any more.  We check startcodes when
+      // server comes in on regionServerStartup -- St.Ack
+      // This state is reachable if:
+      // 1) RegionServer A started
+      // 2) RegionServer B started on the same machine, then clobbered A in regionServerStartup.
+      // 3) RegionServer A returns, expecting to work as usual.
+      // The answer is to ask A to shut down for good.
+      LOG.warn("Race condition detected: " + reportedInfo.getServerName());
+      synchronized (this.onlineServers) {
+        removeServerInfo(reportedInfo.getServerName());
+        notifyOnlineServers();
       }
+      return true;
     }
+    return false;
   }
 
-  /*
+  /**
    *  RegionServer is checking in, no exceptional circumstances
    * @param serverInfo
    * @param mostLoadedRegions
@@ -400,314 +325,25 @@ public class ServerManager {
       final HRegionInfo[] mostLoadedRegions, HMsg[] msgs)
   throws IOException {
     // Refresh the info object and the load information
-    this.serversToServerInfo.put(serverInfo.getServerName(), serverInfo);
-    HServerLoad load = this.serversToLoad.get(serverInfo.getServerName());
-    if (load != null) {
-      this.master.getMetrics().incrementRequests(load.getNumberOfRequests());
-      if (!load.equals(serverInfo.getLoad())) {
-        updateLoadToServers(serverInfo.getServerName(), load);
-      }
-    }
-
-    // Set the current load information
-    load = serverInfo.getLoad();
-    this.serversToLoad.put(serverInfo.getServerName(), load);
-    synchronized (loadToServers) {
-      Set<String> servers = this.loadToServers.get(load);
-      if (servers == null) {
-        servers = new HashSet<String>();
-      }
-      servers.add(serverInfo.getServerName());
-      this.loadToServers.put(load, servers);
-    }
-
-    // Next, process messages for this server
-    return processMsgs(serverInfo, mostLoadedRegions, msgs);
-  }
-
-  /*
-   * Process all the incoming messages from a server that's contacted us.
-   * Note that we never need to update the server's load information because
-   * that has already been done in regionServerReport.
-   * @param serverInfo
-   * @param mostLoadedRegions
-   * @param incomingMsgs
-   * @return
-   */
-  private HMsg[] processMsgs(HServerInfo serverInfo,
-      HRegionInfo[] mostLoadedRegions, HMsg incomingMsgs[]) {
-    ArrayList<HMsg> returnMsgs = new ArrayList<HMsg>();
-    if (serverInfo.getServerAddress() == null) {
-      throw new NullPointerException("Server address cannot be null; " +
-        "hbase-958 debugging");
-    }
-    // Get reports on what the RegionServer did.
-    // Be careful that in message processors we don't throw exceptions that
-    // break the switch below because then we might drop messages on the floor.
-    int openingCount = 0;
-    for (int i = 0; i < incomingMsgs.length; i++) {
-      HRegionInfo region = incomingMsgs[i].getRegionInfo();
-      LOG.info("Processing " + incomingMsgs[i] + " from " +
-        serverInfo.getServerName() + "; " + (i + 1) + " of " +
-        incomingMsgs.length);
-      if (!this.master.getRegionServerOperationQueue().
-          process(serverInfo, incomingMsgs[i])) {
-        continue;
-      }
-      switch (incomingMsgs[i].getType()) {
-        case MSG_REPORT_PROCESS_OPEN:
-          openingCount++;
-          break;
-
-        case MSG_REPORT_OPEN:
-          processRegionOpen(serverInfo, region, returnMsgs);
-          break;
-
-        case MSG_REPORT_CLOSE:
-          processRegionClose(region);
-          break;
-
-        case MSG_REPORT_SPLIT:
-          processSplitRegion(region, incomingMsgs[++i].getRegionInfo(),
-            incomingMsgs[++i].getRegionInfo());
-          break;
-
-        case MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS:
-          processSplitRegion(region, incomingMsgs[i].getDaughterA(),
-            incomingMsgs[i].getDaughterB());
-          break;
-
-        default:
-          LOG.warn("Impossible state during message processing. Instruction: " +
-            incomingMsgs[i].getType());
-      }
-    }
-
-    synchronized (this.master.getRegionManager()) {
-      // Tell the region server to close regions that we have marked for closing.
-      for (HRegionInfo i:
-        this.master.getRegionManager().getMarkedToClose(serverInfo.getServerName())) {
-        returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE, i));
-        // Transition the region from toClose to closing state
-        this.master.getRegionManager().setPendingClose(i.getRegionNameAsString());
-      }
-
-      // Figure out what the RegionServer ought to do, and write back.
-
-      // Should we tell it close regions because its overloaded?  If its
-      // currently opening regions, leave it alone till all are open.
-      if (openingCount < this.nobalancingCount) {
-        this.master.getRegionManager().assignRegions(serverInfo, mostLoadedRegions,
-          returnMsgs);
-      }
-
-      // Send any pending table actions.
-      this.master.getRegionManager().applyActions(serverInfo, returnMsgs);
-    }
-    return returnMsgs.toArray(new HMsg[returnMsgs.size()]);
-  }
-
-  /*
-   * A region has split.
-   *
-   * @param region
-   * @param splitA
-   * @param splitB
-   * @param returnMsgs
-   */
-  private void processSplitRegion(HRegionInfo region, HRegionInfo a, HRegionInfo b) {
-    synchronized (master.getRegionManager()) {
-      // Cancel any actions pending for the affected region.
-      // This prevents the master from sending a SPLIT message if the table
-      // has already split by the region server.
-      this.master.getRegionManager().endActions(region.getRegionName());
-      assignSplitDaughter(a);
-      assignSplitDaughter(b);
-      if (region.isMetaTable()) {
-        // A meta region has split.
-        this. master.getRegionManager().offlineMetaRegionWithStartKey(region.getStartKey());
-        this.master.getRegionManager().incrementNumMetaRegions();
-      }
-    }
-  }
-
-  /*
-   * Assign new daughter-of-a-split UNLESS its already been assigned.
-   * It could have been assigned already in rare case where there was a large
-   * gap between insertion of the daughter region into .META. by the
-   * splitting regionserver and receipt of the split message in master (See
-   * HBASE-1784).
-   * @param hri Region to assign.
-   */
-  private void assignSplitDaughter(final HRegionInfo hri) {
-    MetaRegion mr =
-      this.master.getRegionManager().getFirstMetaRegionForRegion(hri);
-    Get g = new Get(hri.getRegionName());
-    g.addFamily(HConstants.CATALOG_FAMILY);
-    try {
-      HRegionInterface server =
-        this.master.getServerConnection().getHRegionConnection(mr.getServer());
-      Result r = server.get(mr.getRegionName(), g);
-      // If size > 3 -- presume regioninfo, startcode and server -- then presume
-      // that this daughter already assigned and return.
-      if (r.size() >= 3) return;
-    } catch (IOException e) {
-      LOG.warn("Failed get on " + HConstants.CATALOG_FAMILY_STR +
-        "; possible double-assignment?", e);
-    }
-    this.master.getRegionManager().setUnassigned(hri, false);
-  }
-
-  /*
-   * Region server is reporting that a region is now opened
-   * @param serverInfo
-   * @param region
-   * @param returnMsgs
-   */
-  public void processRegionOpen(HServerInfo serverInfo,
-      HRegionInfo region, ArrayList<HMsg> returnMsgs) {
-    boolean duplicateAssignment = false;
-    synchronized (master.getRegionManager()) {
-      if (!this.master.getRegionManager().isUnassigned(region) &&
-          !this.master.getRegionManager().isPendingOpen(region.getRegionNameAsString())) {
-        if (region.isRootRegion()) {
-          // Root region
-          HServerAddress rootServer =
-            this.master.getRegionManager().getRootRegionLocation();
-          if (rootServer != null) {
-            if (rootServer.compareTo(serverInfo.getServerAddress()) == 0) {
-              // A duplicate open report from the correct server
-              return;
-            }
-            // We received an open report on the root region, but it is
-            // assigned to a different server
-            duplicateAssignment = true;
-          }
-        } else {
-          // Not root region. If it is not a pending region, then we are
-          // going to treat it as a duplicate assignment, although we can't
-          // tell for certain that's the case.
-          if (this.master.getRegionManager().isPendingOpen(
-              region.getRegionNameAsString())) {
-            // A duplicate report from the correct server
-            return;
-          }
-          duplicateAssignment = true;
-        }
-      }
-
-      if (duplicateAssignment) {
-        LOG.warn("region server " + serverInfo.getServerAddress().toString() +
-          " should not have opened region " + Bytes.toString(region.getRegionName()));
-
-        // This Region should not have been opened.
-        // Ask the server to shut it down, but don't report it as closed.
-        // Otherwise the HMaster will think the Region was closed on purpose,
-        // and then try to reopen it elsewhere; that's not what we want.
-        returnMsgs.add(new HMsg(HMsg.Type.MSG_REGION_CLOSE_WITHOUT_REPORT,
-          region, "Duplicate assignment".getBytes()));
-      } else {
-        if (region.isRootRegion()) {
-          // it was assigned, and it's not a duplicate assignment, so take it out
-          // of the unassigned list.
-          this.master.getRegionManager().removeRegion(region);
-
-          // Store the Root Region location (in memory)
-          HServerAddress rootServer = serverInfo.getServerAddress();
-          this.master.getServerConnection().setRootRegionLocation(
-            new HRegionLocation(region, rootServer));
-          this.master.getRegionManager().setRootRegionLocation(rootServer);
-        } else {
-          // Note that the table has been assigned and is waiting for the
-          // meta table to be updated.
-          this.master.getRegionManager().setOpen(region.getRegionNameAsString());
-          RegionServerOperation op =
-            new ProcessRegionOpen(master, serverInfo, region);
-          this.master.getRegionServerOperationQueue().put(op);
-        }
-      }
+    this.onlineServers.put(serverInfo.getServerName(), serverInfo);
+    HServerLoad load = serverInfo.getLoad();
+    if (load != null && this.metrics != null) {
+      this.metrics.incrementRequests(load.getNumberOfRequests());
     }
+    // No more piggyback messages on heartbeats for other stuff
+    return msgs;
   }
 
-  /*
-   * @param region
-   * @throws Exception
+  /**
+   * @param serverName
+   * @return True if we removed server from the list.
    */
-  public void processRegionClose(HRegionInfo region) {
-    synchronized (this.master.getRegionManager()) {
-      if (region.isRootRegion()) {
-        // Root region
-        this.master.getRegionManager().unsetRootRegion();
-        if (region.isOffline()) {
-          // Can't proceed without root region. Shutdown.
-          LOG.fatal("root region is marked offline");
-          this.master.shutdown();
-          return;
-        }
-
-      } else if (region.isMetaTable()) {
-        // Region is part of the meta table. Remove it from onlineMetaRegions
-        this.master.getRegionManager().offlineMetaRegionWithStartKey(region.getStartKey());
-      }
-
-      boolean offlineRegion =
-        this.master.getRegionManager().isOfflined(region.getRegionNameAsString());
-      boolean reassignRegion = !region.isOffline() && !offlineRegion;
-
-      // NOTE: If the region was just being closed and not offlined, we cannot
-      //       mark the region unassignedRegions as that changes the ordering of
-      //       the messages we've received. In this case, a close could be
-      //       processed before an open resulting in the master not agreeing on
-      //       the region's state.
-      this.master.getRegionManager().setClosed(region.getRegionNameAsString());
-      RegionServerOperation op =
-        new ProcessRegionClose(master, region, offlineRegion, reassignRegion);
-      this.master.getRegionServerOperationQueue().put(op);
-    }
-  }
-
-  /** Update a server load information because it's shutting down*/
   private boolean removeServerInfo(final String serverName) {
-    boolean infoUpdated = false;
-    HServerInfo info = this.serversToServerInfo.remove(serverName);
-    // Only update load information once.
-    // This method can be called a couple of times during shutdown.
+    HServerInfo info = this.onlineServers.remove(serverName);
     if (info != null) {
-      LOG.info("Removing server's info " + serverName);
-      this.master.getRegionManager().offlineMetaServer(info.getServerAddress());
-
-      //HBASE-1928: Check whether this server has been transitioning the ROOT table
-      if (this.master.getRegionManager().isRootInTransitionOnThisServer(serverName)) {
-         this.master.getRegionManager().unsetRootRegion();
-         this.master.getRegionManager().reassignRootRegion();
-      }
-
-      //HBASE-1928: Check whether this server has been transitioning the META table
-      HRegionInfo metaServerRegionInfo = this.master.getRegionManager().getMetaServerRegionInfo (serverName);
-      if (metaServerRegionInfo != null) {
-         this.master.getRegionManager().setUnassigned(metaServerRegionInfo, true);
-      }
-
-      infoUpdated = true;
-      // update load information
-      updateLoadToServers(serverName, this.serversToLoad.remove(serverName));
-    }
-    return infoUpdated;
-  }
-
-  private void updateLoadToServers(final String serverName,
-      final HServerLoad load) {
-    if (load == null) return;
-    synchronized (this.loadToServers) {
-      Set<String> servers = this.loadToServers.get(load);
-      if (servers != null) {
-        servers.remove(serverName);
-        if (servers.size() > 0)
-          this.loadToServers.put(load, servers);
-        else
-          this.loadToServers.remove(load);
-      }
+      return true;
     }
+    return false;
   }
 
   /**
@@ -720,19 +356,22 @@ public class ServerManager {
     int totalLoad = 0;
     int numServers = 0;
     double averageLoad = 0.0;
-    synchronized (serversToLoad) {
-      numServers = serversToLoad.size();
-      for (HServerLoad load : serversToLoad.values()) {
-        totalLoad += load.getNumberOfRegions();
-      }
-      averageLoad = (double)totalLoad / (double)numServers;
+    for (HServerInfo hsi : onlineServers.values()) {
+        numServers++;
+        totalLoad += hsi.getLoad().getNumberOfRegions();
     }
+    averageLoad = (double)totalLoad / (double)numServers;
     return averageLoad;
   }
 
   /** @return the number of active servers */
   public int numServers() {
-    return this.serversToServerInfo.size();
+    int num = -1;
+    // This synchronized seems gratuitous.
+    synchronized (this.onlineServers) {
+      num = this.onlineServers.size();
+    }
+    return num;
   }
 
   /**
@@ -740,57 +379,43 @@ public class ServerManager {
    * @return HServerInfo for the given server address
    */
   public HServerInfo getServerInfo(String name) {
-    return this.serversToServerInfo.get(name);
+    return this.onlineServers.get(name);
   }
 
   /**
-   * @return Read-only map of servers to serverinfo.
+   * @return Read-only map of servers to serverinfo
    */
-  public Map<String, HServerInfo> getServersToServerInfo() {
-    synchronized (this.serversToServerInfo) {
-      return Collections.unmodifiableMap(this.serversToServerInfo);
+  public Map<String, HServerInfo> getOnlineServers() {
+    // Presumption is that iterating the returned Map is OK.
+    synchronized (this.onlineServers) {
+      return Collections.unmodifiableMap(this.onlineServers);
     }
   }
 
+  public Set<String> getDeadServers() {
+    return this.deadservers.clone();
+  }
+
   /**
    * @param hsa
    * @return The HServerInfo whose HServerAddress is <code>hsa</code> or null
    * if nothing found.
    */
   public HServerInfo getHServerInfo(final HServerAddress hsa) {
-    synchronized(this.serversToServerInfo) {
+    synchronized(this.onlineServers) {
       // TODO: This is primitive.  Do a better search.
-      for (Map.Entry<String, HServerInfo> e: this.serversToServerInfo.entrySet()) {
-        if (e.getValue().getServerAddress().equals(hsa)) return e.getValue();
+      for (Map.Entry<String, HServerInfo> e: this.onlineServers.entrySet()) {
+        if (e.getValue().getServerAddress().equals(hsa)) {
+          return e.getValue();
+        }
       }
     }
     return null;
   }
 
-  /**
-   * @return Read-only map of servers to load.
-   */
-  public Map<String, HServerLoad> getServersToLoad() {
-    synchronized (this.serversToLoad) {
-      return Collections.unmodifiableMap(serversToLoad);
-    }
-  }
-
-  /**
-   * @return Read-only map of load to servers.
-   */
-  public SortedMap<HServerLoad, Set<String>> getLoadToServers() {
-    synchronized (this.loadToServers) {
-      return Collections.unmodifiableSortedMap(this.loadToServers);
-    }
-  }
-
-  /**
-   * Wakes up threads waiting on serversToServerInfo
-   */
-  public void notifyServers() {
-    synchronized (this.serversToServerInfo) {
-      this.serversToServerInfo.notifyAll();
+  private void notifyOnlineServers() {
+    synchronized (this.onlineServers) {
+      this.onlineServers.notifyAll();
     }
   }
 
@@ -801,17 +426,12 @@ public class ServerManager {
    * a MSG_REGIONSERVER_STOP.
    */
   void letRegionServersShutdown() {
-    if (!master.checkFileSystem()) {
-      // Forget waiting for the region servers if the file system has gone
-      // away. Just exit as quickly as possible.
-      return;
-    }
-    synchronized (serversToServerInfo) {
-      while (serversToServerInfo.size() > 0) {
+    synchronized (onlineServers) {
+      while (onlineServers.size() > 0) {
         LOG.info("Waiting on following regionserver(s) to go down " +
-          this.serversToServerInfo.values());
+          this.onlineServers.values());
         try {
-          this.serversToServerInfo.wait(500);
+          this.onlineServers.wait(500);
         } catch (InterruptedException e) {
           // continue
         }
@@ -819,118 +439,145 @@ public class ServerManager {
     }
   }
 
-  /** Watcher triggered when a RS znode is deleted */
-  private class ServerExpirer implements Watcher {
-    private HServerInfo server;
-
-    ServerExpirer(final HServerInfo hsi) {
-      this.server = hsi;
-    }
-
-    public void process(WatchedEvent event) {
-      if (!event.getType().equals(EventType.NodeDeleted)) {
-        LOG.warn("Unexpected event=" + event);
-        return;
-      }
-      LOG.info(this.server.getServerName() + " znode expired");
-      expireServer(this.server);
-    }
-  }
-
   /*
    * Expire the passed server.  Add it to list of deadservers and queue a
    * shutdown processing.
    */
-  private synchronized void expireServer(final HServerInfo hsi) {
+  public synchronized void expireServer(final HServerInfo hsi) {
     // First check a server to expire.  ServerName is of the form:
     // <hostname> , <port> , <startcode>
     String serverName = hsi.getServerName();
-    HServerInfo info = this.serversToServerInfo.get(serverName);
+    HServerInfo info = this.onlineServers.get(serverName);
     if (info == null) {
-      LOG.warn("No HServerInfo for " + serverName);
+      LOG.warn("Received expiration of " + hsi.getServerName() +
+        " but server is not currently online");
       return;
     }
-    if (this.deadServers.contains(serverName)) {
-      LOG.warn("Already processing shutdown of " + serverName);
+    if (this.deadservers.contains(serverName)) {
+      // TODO: Can this happen?  It shouldn't be online in this case?
+      LOG.warn("Received expiration of " + hsi.getServerName() +
+          " but server shutdown is already in progress");
       return;
     }
     // Remove the server from the known servers lists and update load info
-    this.serversToServerInfo.remove(serverName);
-    HServerLoad load = this.serversToLoad.remove(serverName);
-    if (load != null) {
-      synchronized (this.loadToServers) {
-        Set<String> servers = this.loadToServers.get(load);
-        if (servers != null) {
-          servers.remove(serverName);
-          if (servers.isEmpty()) this.loadToServers.remove(load);
-        }
+    this.onlineServers.remove(serverName);
+    this.serverConnections.remove(serverName);
+    // If cluster is going down, yes, servers are going to be expiring; don't
+    // process as a dead server
+    if (this.clusterShutdown) {
+      LOG.info("Cluster shutdown set; " + hsi.getServerName() +
+        " expired; onlineServers=" + this.onlineServers.size());
+      if (this.onlineServers.isEmpty()) {
+        master.stop("Cluster shutdown set; onlineServer=0");
       }
+      return;
     }
-    // Add to dead servers and queue a shutdown processing.
+    this.services.getExecutorService().submit(new ServerShutdownHandler(this.master,
+        this.services, deadservers, info));
     LOG.debug("Added=" + serverName +
-      " to dead servers, added shutdown processing operation");
-    this.deadServers.add(serverName);
-    this.master.getRegionServerOperationQueue().
-      put(new ProcessServerShutdown(master, info));
+      " to dead servers, submitted shutdown handler to be executed");
   }
 
-  /**
-   * @param serverName
-   */
-  void removeDeadServer(String serverName) {
-    this.deadServers.remove(serverName);
+  public boolean canAssignUserRegions() {
+    if (minimumServerCount == 0) {
+      return true;
+    }
+    return (numServers() >= minimumServerCount);
   }
 
-  /**
-   * @param serverName
-   * @return true if server is dead
-   */
-  public boolean isDead(final String serverName) {
-    return isDead(serverName, false);
+  public void setMinimumServerCount(int minimumServerCount) {
+    this.minimumServerCount = minimumServerCount;
   }
 
+  // RPC methods to region servers
+
   /**
-   * @param serverName Servername as either <code>host:port</code> or
-   * <code>host,port,startcode</code>.
-   * @param hostAndPortOnly True if <code>serverName</code> is host and
-   * port only (<code>host:port</code>) and if so, then we do a prefix compare
-   * (ignoring start codes) looking for dead server.
-   * @return true if server is dead
-   */
-  boolean isDead(final String serverName, final boolean hostAndPortOnly) {
-    return isDead(this.deadServers, serverName, hostAndPortOnly);
+   * Sends an OPEN RPC to the specified server to open the specified region.
+   * <p>
+   * Open should not fail but can if server just crashed.
+   * <p>
+   * @param server server to open a region
+   * @param regionName region to open
+   */
+  public void sendRegionOpen(HServerInfo server, HRegionInfo region) {
+    HRegionInterface hri = getServerConnection(server);
+    if(hri == null) {
+      LOG.warn("Attempting to send OPEN RPC to server " + server.getServerName()
+          + " failed because no RPC connection found to this server");
+      return;
+    }
+    hri.openRegion(region);
   }
 
-  static boolean isDead(final Set<String> deadServers,
-      final String serverName, final boolean hostAndPortOnly) {
-    return HServerInfo.isServer(deadServers, serverName, hostAndPortOnly);
+  /**
+   * Sends an CLOSE RPC to the specified server to close the specified region.
+   * <p>
+   * A region server could reject the close request because it either does not
+   * have the specified region or the region is being split.
+   * @param server server to open a region
+   * @param regionName region to open
+   * @return true if server acknowledged close, false if not
+   * @throws NotServingRegionException
+   */
+  public void sendRegionClose(HServerInfo server, HRegionInfo region)
+  throws NotServingRegionException {
+    HRegionInterface hri = getServerConnection(server);
+    if(hri == null) {
+      LOG.warn("Attempting to send CLOSE RPC to server " + server.getServerName()
+          + " failed because no RPC connection found to this server");
+      return;
+    }
+    hri.closeRegion(region);
   }
 
-  Set<String> getDeadServers() {
-    return this.deadServers;
+  private HRegionInterface getServerConnection(HServerInfo info) {
+    try {
+      ServerConnection connection =
+        ServerConnectionManager.getConnection(this.master.getConfiguration());
+      HRegionInterface hri = serverConnections.get(info.getServerName());
+      if(hri == null) {
+        LOG.info("new connection");
+        hri = connection.getHRegionConnection(info.getServerAddress(), false);
+        serverConnections.put(info.getServerName(), hri);
+      }
+      return hri;
+    } catch (IOException e) {
+      LOG.error("Error connecting to region server", e);
+      throw new RuntimeException("Fatal error connection to RS", e);
+    }
   }
 
   /**
-   * Add to the passed <code>m</code> servers that are loaded less than
-   * <code>l</code>.
-   * @param l
-   * @param m
+   * Waits for the minimum number of servers to be running.
    */
-  void getLightServers(final HServerLoad l,
-      SortedMap<HServerLoad, Set<String>> m) {
-    synchronized (this.loadToServers) {
-      m.putAll(this.loadToServers.headMap(l));
+  public void waitForMinServers() {
+    while(numServers() < minimumServerCount) {
+//        !masterStatus.getShutdownRequested().get()) {
+      LOG.info("Waiting for enough servers to check in.  Currently have " +
+          numServers() + " but need at least " + minimumServerCount);
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.warn("Got interrupted waiting for servers to check in, looping");
+      }
     }
   }
 
-  public boolean canAssignUserRegions() {
-    if (minimumServerCount == 0) {
-      return true;
-    }
-    return (numServers() >= minimumServerCount);
+  public List<HServerInfo> getOnlineServersList() {
+    // TODO: optimize the load balancer call so we don't need to make a new list
+    return new ArrayList<HServerInfo>(onlineServers.values());
   }
 
-  public void setMinimumServerCount(int minimumServerCount) {
-    this.minimumServerCount = minimumServerCount;
+  public boolean isServerOnline(String serverName) {
+    return onlineServers.containsKey(serverName);
+  }
+
+  public void shutdownCluster() {
+    LOG.info("Cluster shutdown requested");
+    this.clusterShutdown = true;
+  }
+
+  public boolean isClusterShutdown() {
+    return this.clusterShutdown;
   }
-}
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ClosedRegionHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,114 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.RegionTransitionData;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles CLOSED region event on Master.
+ * <p>
+ * If table is being disabled, deletes ZK unassigned node and removes from
+ * regions in transition.
+ * <p>
+ * Otherwise, assigns the region to another server.
+ */
+public class ClosedRegionHandler extends EventHandler implements TotesHRegionInfo {
+  private static final Log LOG = LogFactory.getLog(ClosedRegionHandler.class);
+
+  private final AssignmentManager assignmentManager;
+  private final RegionTransitionData data;
+  private final HRegionInfo regionInfo;
+
+  private final ClosedPriority priority;
+
+  private enum ClosedPriority {
+    ROOT (1),
+    META (2),
+    USER (3);
+
+    private final int value;
+    ClosedPriority(int value) {
+      this.value = value;
+    }
+    public int getValue() {
+      return value;
+    }
+  };
+
+  public ClosedRegionHandler(Server server,
+      AssignmentManager assignmentManager, RegionTransitionData data,
+      HRegionInfo regionInfo) {
+    super(server, EventType.RS2ZK_REGION_CLOSED);
+    this.assignmentManager = assignmentManager;
+    this.data = data;
+    this.regionInfo = regionInfo;
+    if(regionInfo.isRootRegion()) {
+      priority = ClosedPriority.ROOT;
+    } else if(regionInfo.isMetaRegion()) {
+      priority = ClosedPriority.META;
+    } else {
+      priority = ClosedPriority.USER;
+    }
+  }
+
+  @Override
+  public int getPriority() {
+    return priority.getValue();
+  }
+
+  @Override
+  public HRegionInfo getHRegionInfo() {
+    return this.regionInfo;
+  }
+
+  @Override
+  public void process() {
+    LOG.debug("Handling CLOSED event");
+    // Check if this table is being disabled or not
+    if (assignmentManager.isTableOfRegionDisabled(regionInfo.getRegionName())) {
+      // Disabling so should not be reassigned, just delete the CLOSED node
+      LOG.debug("Table being disabled so deleting ZK node and removing from " +
+          "regions in transition, skipping assignment");
+      try {
+        ZKAssign.deleteClosedNode(server.getZooKeeper(),
+            regionInfo.getEncodedName());
+      } catch (KeeperException.NoNodeException nne) {
+        LOG.warn("Tried to delete closed node for " + data + " but it does " +
+            "not exist");
+        return;
+      } catch (KeeperException e) {
+        server.abort("Error deleting CLOSED node in ZK", e);
+      }
+      assignmentManager.regionOffline(regionInfo);
+      return;
+    }
+    // ZK Node is in CLOSED state, assign it.
+    assignmentManager.setOffline(regionInfo);
+    assignmentManager.assign(regionInfo);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,53 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.master.MasterServices;
+
+public class DeleteTableHandler extends TableEventHandler {
+  private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class);
+
+  public DeleteTableHandler(byte [] tableName, Server server,
+      final MasterServices masterServices) throws IOException {
+    super(EventType.C2M_DELETE_TABLE, tableName, server, masterServices);
+  }
+
+  @Override
+  protected void handleTableOperation(List<HRegionInfo> regions)
+  throws IOException {
+    for(HRegionInfo region : regions) {
+      LOG.debug("Deleting region " + region + " from META and FS");
+      // Remove region from META
+      MetaEditor.deleteRegion(this.server.getCatalogTracker(), region);
+      // Delete region from FS
+      this.masterServices.getMasterFileSystem().deleteRegion(region);
+    }
+    // Delete table from FS
+    this.masterServices.getMasterFileSystem().deleteTable(tableName);
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/DisableTableHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,83 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+public class DisableTableHandler extends EventHandler {
+  private static final Log LOG = LogFactory.getLog(DisableTableHandler.class);
+
+  private final byte [] tableName;
+  private final String tableNameStr;
+  private final AssignmentManager assignmentManager;
+
+  public DisableTableHandler(Server server, byte [] tableName,
+      CatalogTracker catalogTracker, AssignmentManager assignmentManager)
+  throws TableNotFoundException, IOException {
+    super(server, EventType.C2M_DISABLE_TABLE);
+    this.tableName = tableName;
+    this.tableNameStr = Bytes.toString(this.tableName);
+    this.assignmentManager = assignmentManager;
+    // Check if table exists
+    // TODO: do we want to keep this in-memory as well?  i guess this is
+    //       part of old master rewrite, schema to zk to check for table
+    //       existence and such
+    if(!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
+      throw new TableNotFoundException(Bytes.toString(tableName));
+    }
+  }
+
+  @Override
+  public void process() {
+    try {
+      LOG.info("Attemping to disable the table " + this.tableNameStr);
+      handleDisableTable();
+    } catch (IOException e) {
+      LOG.error("Error trying to disable the table " + this.tableNameStr, e);
+    }
+  }
+
+  private void handleDisableTable() throws IOException {
+    // Set the table as disabled so it doesn't get re-onlined
+    assignmentManager.disableTable(this.tableNameStr);
+    // Get the online regions of this table.
+    // TODO: What if region splitting at the time we get this listing?
+    // TODO: Remove offline flag from HRI
+    // TODO: Confirm we have parallel closing going on.
+    List<HRegionInfo> regions = assignmentManager.getRegionsOfTable(tableName);
+    // Unassign the online regions
+    for(HRegionInfo region : regions) {
+      assignmentManager.unassign(region);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/EnableTableHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
+import org.apache.hadoop.hbase.catalog.MetaReader;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.util.Bytes;
+
+
+public class EnableTableHandler extends EventHandler {
+  private static final Log LOG = LogFactory.getLog(EnableTableHandler.class);
+
+  private final byte [] tableName;
+  private final String tableNameStr;
+  private final AssignmentManager assignmentManager;
+  private final CatalogTracker ct;
+
+  public EnableTableHandler(Server server, byte [] tableName,
+      CatalogTracker catalogTracker, AssignmentManager assignmentManager)
+  throws TableNotFoundException, IOException {
+    super(server, EventType.C2M_ENABLE_TABLE);
+    this.tableName = tableName;
+    this.tableNameStr = Bytes.toString(tableName);
+    this.ct = catalogTracker;
+    this.assignmentManager = assignmentManager;
+    // Check if table exists
+    if(!MetaReader.tableExists(catalogTracker, this.tableNameStr)) {
+      throw new TableNotFoundException(Bytes.toString(tableName));
+    }
+  }
+
+  @Override
+  public void process() {
+    try {
+      LOG.info("Attemping to enable the table " + this.tableNameStr);
+      handleEnableTable();
+    } catch (IOException e) {
+      LOG.error("Error trying to enable the table " + this.tableNameStr, e);
+    }
+  }
+
+  private void handleEnableTable() throws IOException {
+    // Get the regions of this table
+    List<HRegionInfo> regions = MetaReader.getTableRegions(this.ct, tableName);
+    // Set the table as disabled so it doesn't get re-onlined
+    assignmentManager.undisableTable(this.tableNameStr);
+    // Verify all regions of table are disabled
+    for (HRegionInfo region : regions) {
+      assignmentManager.assign(region);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,52 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.catalog.MetaEditor;
+import org.apache.hadoop.hbase.master.MasterServices;
+
+public class ModifyTableHandler extends TableEventHandler {
+  private final HTableDescriptor htd;
+
+  public ModifyTableHandler(final byte [] tableName,
+      final HTableDescriptor htd, final Server server,
+      final MasterServices masterServices) throws IOException {
+    super(EventType.C2M_MODIFY_TABLE, tableName, server, masterServices);
+    this.htd = htd;
+  }
+
+  @Override
+  protected void handleTableOperation(List<HRegionInfo> hris)
+  throws IOException {
+    for (HRegionInfo hri : hris) {
+      // Update region info in META
+      hri.setTableDesc(this.htd);
+      MetaEditor.updateRegionInfo(this.server.getCatalogTracker(), hri);
+      // Update region info in FS
+      this.masterServices.getMasterFileSystem().updateRegionInfo(hri);
+    }
+  }
+}
\ No newline at end of file

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java?rev=991397&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/master/handler/OpenedRegionHandler.java Tue Aug 31 23:51:44 2010
@@ -0,0 +1,100 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.handler;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HServerInfo;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.executor.EventHandler;
+import org.apache.hadoop.hbase.executor.RegionTransitionData;
+import org.apache.hadoop.hbase.master.AssignmentManager;
+import org.apache.hadoop.hbase.zookeeper.ZKAssign;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Handles OPENED region event on Master.
+ */
+public class OpenedRegionHandler extends EventHandler implements TotesHRegionInfo {
+  private static final Log LOG = LogFactory.getLog(OpenedRegionHandler.class);
+
+  private final AssignmentManager assignmentManager;
+  private final RegionTransitionData data;
+  private final HRegionInfo regionInfo;
+  private final HServerInfo serverInfo;
+  private final OpenedPriority priority;
+
+  private enum OpenedPriority {
+    ROOT (1),
+    META (2),
+    USER (3);
+
+    private final int value;
+    OpenedPriority(int value) {
+      this.value = value;
+    }
+    public int getValue() {
+      return value;
+    }
+  };
+
+  public OpenedRegionHandler(Server server,
+      AssignmentManager assignmentManager, RegionTransitionData data,
+      HRegionInfo regionInfo, HServerInfo serverInfo) {
+    super(server, EventType.RS2ZK_REGION_OPENED);
+    this.assignmentManager = assignmentManager;
+    this.data = data;
+    this.regionInfo = regionInfo;
+    this.serverInfo = serverInfo;
+    if(regionInfo.isRootRegion()) {
+      priority = OpenedPriority.ROOT;
+    } else if(regionInfo.isMetaRegion()) {
+      priority = OpenedPriority.META;
+    } else {
+      priority = OpenedPriority.USER;
+    }
+  }
+
+  @Override
+  public int getPriority() {
+    return priority.getValue();
+  }
+
+  @Override
+  public HRegionInfo getHRegionInfo() {
+    return this.regionInfo;
+  }
+
+  @Override
+  public void process() {
+    LOG.debug("Handling OPENED event; deleting unassigned node");
+    // TODO: should we check if this table was disabled and get it closed?
+    // Remove region from in-memory transition and unassigned node from ZK
+    try {
+      ZKAssign.deleteOpenedNode(server.getZooKeeper(),
+          regionInfo.getEncodedName());
+    } catch (KeeperException e) {
+      server.abort("Error deleting OPENED node in ZK", e);
+    }
+    assignmentManager.regionOnline(regionInfo, serverInfo);
+    LOG.debug("Opened region " + regionInfo.getRegionNameAsString());
+  }
+}
\ No newline at end of file



Mime
View raw message