hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jg...@apache.org
Subject svn commit: r982489 [5/7] - in /hbase/branches/0.90_master_rewrite: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/catalog/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/executor/ s...
Date Thu, 05 Aug 2010 07:35:02 GMT
Modified: hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=982489&r1=982488&r2=982489&view=diff
==============================================================================
--- hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.90_master_rewrite/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Aug  5 07:35:00 2010
@@ -71,13 +71,13 @@ import org.apache.hadoop.hbase.Leases;
 import org.apache.hadoop.hbase.LocalHBaseCluster;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.ServerController;
 import org.apache.hadoop.hbase.UnknownRowLockException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.YouAreDeadException;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HMsg.Type;
 import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
+import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.MultiPut;
@@ -87,6 +87,8 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ServerConnection;
 import org.apache.hadoop.hbase.client.ServerConnectionManager;
+import org.apache.hadoop.hbase.executor.HBaseExecutorService;
+import org.apache.hadoop.hbase.executor.HBaseExecutorService.HBaseExecutorServiceType;
 import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
 import org.apache.hadoop.hbase.ipc.HBaseRPC;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@@ -94,6 +96,12 @@ import org.apache.hadoop.hbase.ipc.HBase
 import org.apache.hadoop.hbase.ipc.HBaseServer;
 import org.apache.hadoop.hbase.ipc.HMasterRegionInterface;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
+import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
+import org.apache.hadoop.hbase.regionserver.handler.CloseRootHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
+import org.apache.hadoop.hbase.regionserver.handler.OpenRootHandler;
 import org.apache.hadoop.hbase.regionserver.metrics.RegionServerMetrics;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -112,26 +120,26 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.zookeeper.KeeperException;
 
 /**
- * HRegionServer makes a set of HRegions available to clients.  It checks in with
+ * HRegionServer makes a set of HRegions available to clients. It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
-public class HRegionServer implements HRegionInterface,
-    HBaseRPCErrorHandler, Runnable, Stoppable, ServerController {
+public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
+    Runnable, Stoppable, RegionServerController {
   public static final Log LOG = LogFactory.getLog(HRegionServer.class);
   private static final HMsg REPORT_EXITING = new HMsg(Type.MSG_REPORT_EXITING);
   private static final HMsg REPORT_QUIESCED = new HMsg(Type.MSG_REPORT_QUIESCED);
-  private static final HMsg [] EMPTY_HMSG_ARRAY = new HMsg [] {};
+  private static final HMsg[] EMPTY_HMSG_ARRAY = new HMsg[] {};
 
   // Set when a report to the master comes back with a message asking us to
-  // shutdown.  Also set by call to stop when debugging or running unit tests
+  // shutdown. Also set by call to stop when debugging or running unit tests
   // of HRegionServer in isolation. We use AtomicBoolean rather than
-  // plain boolean so we can pass a reference to Chore threads.  Otherwise,
+  // plain boolean so we can pass a reference to Chore threads. Otherwise,
   // Chore threads need to know about the hosting class.
   protected final AtomicBoolean stopRequested = new AtomicBoolean(false);
 
   protected final AtomicBoolean quiesced = new AtomicBoolean(false);
 
-  // Go down hard.  Used if file system becomes unavailable and also in
+  // Go down hard. Used if file system becomes unavailable and also in
   // debugging and unit tests.
   protected volatile boolean abortRequested;
 
@@ -149,15 +157,14 @@ public class HRegionServer implements HR
   private Path rootDir;
   private final Random rand = new Random();
 
-  // Key is Bytes.hashCode of region name byte array and the value is HRegion
-  // in both of the maps below.  Use Bytes.mapKey(byte []) generating key for
-  // below maps.
-  protected final Map<Integer, HRegion> onlineRegions =
-    new ConcurrentHashMap<Integer, HRegion>();
+  /**
+   * Map of regions currently being served by this region server. Key is the
+   * encoded region name.
+   */
+  protected final Map<String, HRegion> onlineRegions = new ConcurrentHashMap<String, HRegion>();
 
   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private final LinkedBlockingQueue<HMsg> outboundMsgs =
-    new LinkedBlockingQueue<HMsg>();
+  private final LinkedBlockingQueue<HMsg> outboundMsgs = new LinkedBlockingQueue<HMsg>();
 
   final int numRetries;
   protected final int threadWakeFrequency;
@@ -170,7 +177,7 @@ public class HRegionServer implements HR
   // Remote HMaster
   private HMasterRegionInterface hbaseMaster;
 
-  // Server to handle client requests.  Default access so can be accessed by
+  // Server to handle client requests. Default access so can be accessed by
   // unit tests.
   HBaseServer server;
 
@@ -180,7 +187,7 @@ public class HRegionServer implements HR
   // Request counter
   private volatile AtomicInteger requestCount = new AtomicInteger();
 
-  // Info server.  Default access so can be used by unit tests.  REGIONSERVER
+  // Info server. Default access so can be used by unit tests. REGIONSERVER
   // is name of the webapp and the attribute name used stuffing this instance
   // into web context.
   InfoServer infoServer;
@@ -189,11 +196,11 @@ public class HRegionServer implements HR
   public static final String REGIONSERVER = "regionserver";
 
   /*
-   * Space is reserved in HRS constructor and then released when aborting
-   * to recover from an OOME. See HBASE-706.  TODO: Make this percentage of the
-   * heap or a minimum.
+   * Space is reserved in HRS constructor and then released when aborting to
+   * recover from an OOME. See HBASE-706. TODO: Make this percentage of the heap
+   * or a minimum.
    */
-  private final LinkedList<byte[]> reservedSpace = new LinkedList<byte []>();
+  private final LinkedList<byte[]> reservedSpace = new LinkedList<byte[]>();
 
   private RegionServerMetrics metrics;
 
@@ -203,11 +210,12 @@ public class HRegionServer implements HR
   // Cache flushing
   MemStoreFlusher cacheFlusher;
 
-  /* Check for major compactions.
+  /*
+   * Check for major compactions.
    */
   Chore majorCompactionChecker;
 
-  // HLog and HLog roller.  log is protected rather than private to avoid
+  // HLog and HLog roller. log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
   protected volatile HLog hlog;
   LogRoller hlogRoller;
@@ -215,44 +223,49 @@ public class HRegionServer implements HR
   // flag set after we're done setting up server threads (used for testing)
   protected volatile boolean isOnline;
 
-  final Map<String, InternalScanner> scanners =
-    new ConcurrentHashMap<String, InternalScanner>();
+  final Map<String, InternalScanner> scanners = new ConcurrentHashMap<String, InternalScanner>();
 
   // zookeeper connection and watcher
   private ZooKeeperWatcher zooKeeper;
-  
-  // master address manager and watecher
+
+  // master address manager and watcher
   private MasterAddressManager masterAddressManager;
 
+  // catalog tracker
+  private CatalogTracker catalogTracker;
+
   // A sleeper that sleeps for msgInterval.
   private final Sleeper sleeper;
 
   private final long rpcTimeout;
 
-  // Address passed in to constructor.  This is not always the address we run
-  // with.  For example, if passed port is 0, then we are to pick a port.  The
+  // Address passed in to constructor. This is not always the address we run
+  // with. For example, if passed port is 0, then we are to pick a port. The
   // actual address we run with is in the #serverInfo data member.
   private final HServerAddress address;
 
   // The main region server thread.
+  @SuppressWarnings("unused")
   private Thread regionServerThread;
 
   private final String machineName;
 
   /**
    * Starts a HRegionServer at the default location
+   *
    * @param conf
    * @throws IOException
    */
   public HRegionServer(Configuration conf) throws IOException {
-    machineName = DNS.getDefaultHost(
-        conf.get("hbase.regionserver.dns.interface","default"),
-        conf.get("hbase.regionserver.dns.nameserver","default"));
-    String addressStr = machineName + ":" +
-      conf.get(HConstants.REGIONSERVER_PORT,
-          Integer.toString(HConstants.DEFAULT_REGIONSERVER_PORT));
-    // This is not necessarily the address we will run with.  The address we
-    // use will be in #serverInfo data member.  For example, we may have been
+    machineName = DNS.getDefaultHost(conf.get(
+        "hbase.regionserver.dns.interface", "default"), conf.get(
+        "hbase.regionserver.dns.nameserver", "default"));
+    String addressStr = machineName
+        + ":"
+        + conf.get(HConstants.REGIONSERVER_PORT, Integer
+            .toString(HConstants.DEFAULT_REGIONSERVER_PORT));
+    // This is not necessarily the address we will run with. The address we
+    // use will be in #serverInfo data member. For example, we may have been
     // passed a port of 0 which means we should pick some ephemeral port to bind
     // to.
     address = new HServerAddress(addressStr);
@@ -266,7 +279,7 @@ public class HRegionServer implements HR
     this.isOnline = false;
 
     // Config'ed params
-    this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
+    this.numRetries = conf.getInt("hbase.client.retries.number", 2);
     this.threadWakeFrequency = conf.getInt(HConstants.THREAD_WAKE_FREQUENCY,
         10 * 1000);
     this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 1 * 1000);
@@ -274,26 +287,27 @@ public class HRegionServer implements HR
     sleeper = new Sleeper(this.msgInterval, this.stopRequested);
 
     this.maxScannerResultSize = conf.getLong(
-            HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
-            HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
+        HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
+        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
 
     // Task thread to process requests from Master
     this.worker = new Worker();
 
-    this.numRegionsToReport =
-      conf.getInt("hbase.regionserver.numregionstoreport", 10);
+    this.numRegionsToReport = conf.getInt(
+        "hbase.regionserver.numregionstoreport", 10);
 
-    this.rpcTimeout =
-      conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
-          HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
+    this.rpcTimeout = conf.getLong(
+        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
+        HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD);
 
     initialize();
   }
 
   /**
    * Creates all of the state that needs to be reconstructed in case we are
-   * doing a restart. This is shared between the constructor and restart().
-   * Both call it.
+   * doing a restart. This is shared between the constructor and restart(). Both
+   * call it.
+   *
    * @throws IOException
    */
   private void initialize() throws IOException {
@@ -301,41 +315,68 @@ public class HRegionServer implements HR
     this.stopRequested.set(false);
 
     // Server to handle client requests
-    this.server = HBaseRPC.getServer(this, address.getBindAddress(),
-      address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
-      false, conf);
+    this.server = HBaseRPC.getServer(this, address.getBindAddress(), address
+        .getPort(), conf.getInt("hbase.regionserver.handler.count", 10), false,
+        conf);
     this.server.setErrorHandler(this);
     // Address is giving a default IP for the moment. Will be changed after
     // calling the master.
-    this.serverInfo = new HServerInfo(new HServerAddress(
-      new InetSocketAddress(address.getBindAddress(),
-      this.server.getListenerAddress().getPort())), System.currentTimeMillis(),
-      this.conf.getInt("hbase.regionserver.info.port", 60030), machineName);
+    this.serverInfo = new HServerInfo(new HServerAddress(new InetSocketAddress(
+        address.getBindAddress(), this.server.getListenerAddress().getPort())),
+        System.currentTimeMillis(), this.conf.getInt(
+            "hbase.regionserver.info.port", 60030), machineName);
     if (this.serverInfo.getServerAddress() == null) {
-      throw new NullPointerException("Server address cannot be null; " +
-        "hbase-958 debugging");
+      throw new NullPointerException("Server address cannot be null; "
+          + "hbase-958 debugging");
     }
-    initializeThreads();
     initializeZooKeeper();
+    initializeThreads();
     int nbBlocks = conf.getInt("hbase.regionserver.nbreservationblocks", 4);
-    for(int i = 0; i < nbBlocks; i++)  {
+    for (int i = 0; i < nbBlocks; i++) {
       reservedSpace.add(new byte[HConstants.DEFAULT_SIZE_RESERVATION_BLOCK]);
     }
   }
 
   private void initializeZooKeeper() throws IOException {
     // open connection to zookeeper and set primary watcher
-    zooKeeper = new ZooKeeperWatcher(conf, serverInfo.getServerName(), this);
-    
+    zooKeeper = new ZooKeeperWatcher(conf, REGIONSERVER + "-"
+        + serverInfo.getServerName(), this);
+
     // create the master address manager, register with zk, and start it
     masterAddressManager = new MasterAddressManager(zooKeeper, this);
     zooKeeper.registerListener(masterAddressManager);
-    masterAddressManager.monitorMaster();
+    masterAddressManager.start();
+
+    // create the catalog tracker and start it
+    this.catalogTracker = new CatalogTracker(zooKeeper, connection, this,
+        conf.getInt("hbase.regionserver.catalog.timeout", 30000));
+    catalogTracker.start();
   }
 
-  private void initializeThreads() {
+  private void initializeThreads() throws IOException {
     this.workerThread = new Thread(worker);
 
+    // Start executor services
+
+    HBaseExecutorServiceType.RS_OPEN_REGION.startExecutorService(
+        getServerName(),
+        conf.getInt("hbase.regionserver.executor.openregion.threads", 5));
+    HBaseExecutorServiceType.RS_OPEN_ROOT.startExecutorService(
+        getServerName(),
+        conf.getInt("hbase.regionserver.executor.openroot.threads", 1));
+    HBaseExecutorServiceType.RS_OPEN_META.startExecutorService(
+        getServerName(),
+        conf.getInt("hbase.regionserver.executor.openmeta.threads", 1));
+    HBaseExecutorServiceType.RS_CLOSE_REGION.startExecutorService(
+        getServerName(),
+        conf.getInt("hbase.regionserver.executor.closeregion.threads", 5));
+    HBaseExecutorServiceType.RS_CLOSE_ROOT.startExecutorService(
+        getServerName(),
+        conf.getInt("hbase.regionserver.executor.closeroot.threads", 1));
+    HBaseExecutorServiceType.RS_CLOSE_META.startExecutorService(
+        getServerName(),
+        conf.getInt("hbase.regionserver.executor.closemeta.threads", 1));
+
     // Cache flushing thread.
     this.cacheFlusher = new MemStoreFlusher(conf, this);
 
@@ -346,21 +387,21 @@ public class HRegionServer implements HR
     this.hlogRoller = new LogRoller(this);
 
     // Background thread to check for major compactions; needed if region
-    // has not gotten updates in a while.  Make it run at a lesser frequency.
-    int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY +
-        ".multiplier", 1000);
+    // has not gotten updates in a while. Make it run at a lesser frequency.
+    int multiplier = this.conf.getInt(HConstants.THREAD_WAKE_FREQUENCY
+        + ".multiplier", 1000);
     this.majorCompactionChecker = new MajorCompactionChecker(this,
-      this.threadWakeFrequency * multiplier,  this.stopRequested);
+        this.threadWakeFrequency * multiplier, this.stopRequested);
 
-    this.leases = new Leases(
-        (int) conf.getLong(HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
-            HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
+    this.leases = new Leases((int) conf.getLong(
+        HConstants.HBASE_REGIONSERVER_LEASE_PERIOD_KEY,
+        HConstants.DEFAULT_HBASE_REGIONSERVER_LEASE_PERIOD),
         this.threadWakeFrequency);
   }
 
   /**
-   * The HRegionServer sticks in this loop until closed. It repeatedly checks
-   * in with the HMaster, sending heartbeats & reports, and receiving HRegion
+   * The HRegionServer sticks in this loop until closed. It repeatedly checks in
+   * with the HMaster, sending heartbeats & reports, and receiving HRegion
    * load/unload instructions.
    */
   public void run() {
@@ -375,54 +416,54 @@ public class HRegionServer implements HR
           break;
         }
         sleeper.sleep();
-        LOG.warn("No response from master on reportForDuty. Sleeping and " +
-          "then trying again.");
+        LOG.warn("No response from master on reportForDuty. Sleeping and "
+            + "then trying again.");
       }
       List<HMsg> outboundMessages = new ArrayList<HMsg>();
       long lastMsg = 0;
       // Now ask master what it wants us to do and tell it what we have done
       for (int tries = 0; !stopRequested.get() && isHealthy();) {
-        // Try to get the root region location from the master.
+        // Try to get the root region location from zookeeper.
         if (!haveRootRegion.get()) {
-          HServerAddress rootServer = 
-            ZKUtil.getDataAsAddress(zooKeeper, zooKeeper.rootServerZNode);
+          HServerAddress rootServer = catalogTracker.getRootLocation();
           if (rootServer != null) {
-            // By setting the root region location, we bypass the wait imposed on
+            // By setting the root region location, we bypass the wait imposed
+            // on
             // HTable for all regions being assigned.
-            this.connection.setRootRegionLocation(
-                new HRegionLocation(HRegionInfo.ROOT_REGIONINFO, rootServer));
+            this.connection.setRootRegionLocation(new HRegionLocation(
+                HRegionInfo.ROOT_REGIONINFO, rootServer));
             haveRootRegion.set(true);
           }
         }
         long now = System.currentTimeMillis();
         // Drop into the send loop if msgInterval has elapsed or if something
-        // to send.  If we fail talking to the master, then we'll sleep below
+        // to send. If we fail talking to the master, then we'll sleep below
         // on poll of the outboundMsgs blockingqueue.
         if ((now - lastMsg) >= msgInterval || !outboundMessages.isEmpty()) {
           try {
             doMetrics();
-            MemoryUsage memory =
-              ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
-            HServerLoad hsl = new HServerLoad(requestCount.get(),
-              (int)(memory.getUsed()/1024/1024),
-              (int)(memory.getMax()/1024/1024));
-            for (HRegion r: onlineRegions.values()) {
+            MemoryUsage memory = ManagementFactory.getMemoryMXBean()
+                .getHeapMemoryUsage();
+            HServerLoad hsl = new HServerLoad(requestCount.get(), (int) (memory
+                .getUsed() / 1024 / 1024),
+                (int) (memory.getMax() / 1024 / 1024));
+            for (HRegion r : onlineRegions.values()) {
               hsl.addRegionInfo(createRegionLoad(r));
             }
             this.serverInfo.setLoad(hsl);
             this.requestCount.set(0);
             addOutboundMsgs(outboundMessages);
-            HMsg msgs[] = this.hbaseMaster.regionServerReport(
-              serverInfo, outboundMessages.toArray(EMPTY_HMSG_ARRAY),
-              getMostLoadedRegions());
+            HMsg msgs[] = this.hbaseMaster.regionServerReport(serverInfo,
+                outboundMessages.toArray(EMPTY_HMSG_ARRAY),
+                getMostLoadedRegions());
             lastMsg = System.currentTimeMillis();
             updateOutboundMsgs(outboundMessages);
             outboundMessages.clear();
             if (this.quiesced.get() && onlineRegions.size() == 0) {
               // We've just told the master we're exiting because we aren't
               // serving any regions. So set the stop bit and exit.
-              LOG.info("Server quiesced and not serving any regions. " +
-                "Starting shutdown");
+              LOG.info("Server quiesced and not serving any regions. "
+                  + "Starting shutdown");
               stopRequested.set(true);
               this.outboundMsgs.clear();
               continue;
@@ -430,38 +471,36 @@ public class HRegionServer implements HR
 
             // Queue up the HMaster's instruction stream for processing
             boolean restart = false;
-            for(int i = 0;
-                !restart && !stopRequested.get() && i < msgs.length;
-                i++) {
+            for (int i = 0; !restart && !stopRequested.get() && i < msgs.length; i++) {
               LOG.info(msgs[i].toString());
               this.connection.unsetRootRegionLocation();
-              switch(msgs[i].getType()) {
+              switch (msgs[i].getType()) {
 
-              case MSG_REGIONSERVER_STOP:
-                stopRequested.set(true);
-                break;
+                case MSG_REGIONSERVER_STOP:
+                  stopRequested.set(true);
+                  break;
 
-              case MSG_REGIONSERVER_QUIESCE:
-                if (!quiesceRequested) {
-                  try {
-                    toDo.put(new ToDoEntry(msgs[i]));
-                  } catch (InterruptedException e) {
-                    throw new RuntimeException("Putting into msgQueue was " +
-                        "interrupted.", e);
+                case MSG_REGIONSERVER_QUIESCE:
+                  if (!quiesceRequested) {
+                    try {
+                      toDo.put(new ToDoEntry(msgs[i]));
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException("Putting into msgQueue was "
+                          + "interrupted.", e);
+                    }
+                    quiesceRequested = true;
                   }
-                  quiesceRequested = true;
-                }
-                break;
+                  break;
 
-              default:
-                if (fsOk) {
-                  try {
-                    toDo.put(new ToDoEntry(msgs[i]));
-                  } catch (InterruptedException e) {
-                    throw new RuntimeException("Putting into msgQueue was " +
-                        "interrupted.", e);
+                default:
+                  if (fsOk) {
+                    try {
+                      toDo.put(new ToDoEntry(msgs[i]));
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException("Putting into msgQueue was "
+                          + "interrupted.", e);
+                    }
                   }
-                }
               }
             }
             // Reset tries count if we had a successful transaction.
@@ -493,17 +532,17 @@ public class HRegionServer implements HR
             }
             LOG.warn("Attempt=" + tries, e);
             // No point retrying immediately; this is probably connection to
-            // master issue.  Doing below will cause us to sleep.
+            // master issue. Doing below will cause us to sleep.
             lastMsg = System.currentTimeMillis();
           }
         }
         now = System.currentTimeMillis();
         HMsg msg = this.outboundMsgs.poll((msgInterval - (now - lastMsg)),
-          TimeUnit.MILLISECONDS);
+            TimeUnit.MILLISECONDS);
         // If we got something, add it to list of things to send.
-        if (msg != null) outboundMessages.add(msg);
-        // Do some housekeeping before going back around
-        housekeeping();
+        if (msg != null) {
+          outboundMessages.add(msg);
+        }
       } // for
     } catch (Throwable t) {
       if (!checkOOME(t)) {
@@ -522,11 +561,13 @@ public class HRegionServer implements HR
       }
     }
     // Send cache a shutdown.
-    LruBlockCache c = (LruBlockCache)StoreFile.getBlockCache(this.conf);
-    if (c != null) c.shutdown();
+    LruBlockCache c = (LruBlockCache) StoreFile.getBlockCache(this.conf);
+    if (c != null) {
+      c.shutdown();
+    }
 
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
-    // TODO: Should we check they are alive?  If OOME could have exited already
+    // TODO: Should we check they are alive? If OOME could have exited already
     cacheFlusher.interruptIfNecessary();
     compactSplitThread.interruptIfNecessary();
     hlogRoller.interruptIfNecessary();
@@ -543,8 +584,8 @@ public class HRegionServer implements HR
             LOG.info("On abort, closed hlog");
           }
         } catch (Throwable e) {
-          LOG.error("Unable to close log in abort",
-            RemoteExceptionHandler.checkThrowable(e));
+          LOG.error("Unable to close log in abort", RemoteExceptionHandler
+              .checkThrowable(e));
         }
         closeAllRegions(); // Don't leave any open file handles
       }
@@ -556,25 +597,26 @@ public class HRegionServer implements HR
           hlog.closeAndDelete();
         }
       } catch (Throwable e) {
-        LOG.error("Close and delete failed",
-          RemoteExceptionHandler.checkThrowable(e));
+        LOG.error("Close and delete failed", RemoteExceptionHandler
+            .checkThrowable(e));
       }
       try {
         HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
         exitMsg[0] = REPORT_EXITING;
         // Tell the master what regions we are/were serving
         int i = 1;
-        for (HRegion region: closedRegions) {
-          exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE,
-              region.getRegionInfo());
+        for (HRegion region : closedRegions) {
+          exitMsg[i++] = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, region
+              .getRegionInfo());
         }
 
-        LOG.info("telling master that region server is shutting down at: " +
-            serverInfo.getServerName());
-        hbaseMaster.regionServerReport(serverInfo, exitMsg, (HRegionInfo[])null);
+        LOG.info("telling master that region server is shutting down at: "
+            + serverInfo.getServerName());
+        hbaseMaster.regionServerReport(serverInfo, exitMsg,
+            (HRegionInfo[]) null);
       } catch (Throwable e) {
         LOG.warn("Failed to send exiting message to master: ",
-          RemoteExceptionHandler.checkThrowable(e));
+            RemoteExceptionHandler.checkThrowable(e));
       }
       LOG.info("stopping server at: " + this.serverInfo.getServerName());
     }
@@ -586,7 +628,7 @@ public class HRegionServer implements HR
     }
 
     this.zooKeeper.close();
-    
+
     if (!killed) {
       join();
     }
@@ -595,6 +637,7 @@ public class HRegionServer implements HR
 
   /*
    * Add to the passed <code>msgs</code> messages to pass to the master.
+   *
    * @param msgs Current outboundMsgs array; we'll add messages to this List.
    */
   private void addOutboundMsgs(final List<HMsg> msgs) {
@@ -602,8 +645,8 @@ public class HRegionServer implements HR
       this.outboundMsgs.drainTo(msgs);
       return;
     }
-    OUTER: for (HMsg m: this.outboundMsgs) {
-      for (HMsg mm: msgs) {
+    OUTER: for (HMsg m : this.outboundMsgs) {
+      for (HMsg mm : msgs) {
         // Be careful don't add duplicates.
         if (mm.equals(m)) {
           continue OUTER;
@@ -615,12 +658,15 @@ public class HRegionServer implements HR
 
   /*
    * Remove from this.outboundMsgs those messsages we sent the master.
+   *
    * @param msgs Messages we sent the master.
    */
   private void updateOutboundMsgs(final List<HMsg> msgs) {
-    if (msgs.isEmpty()) return;
-    for (HMsg m: this.outboundMsgs) {
-      for (HMsg mm: msgs) {
+    if (msgs.isEmpty()) {
+      return;
+    }
+    for (HMsg m : this.outboundMsgs) {
+      for (HMsg mm : msgs) {
         if (mm.equals(m)) {
           this.outboundMsgs.remove(m);
           break;
@@ -631,11 +677,12 @@ public class HRegionServer implements HR
 
   /*
    * Run init. Sets up hlog and starts up all server threads.
+   *
    * @param c Extra configuration.
    */
   protected void init(final MapWritable c) throws IOException {
     try {
-      for (Map.Entry<Writable, Writable> e: c.entrySet()) {
+      for (Map.Entry<Writable, Writable> e : c.entrySet()) {
         String key = e.getKey().toString();
         String value = e.getValue().toString();
         if (LOG.isDebugEnabled()) {
@@ -646,21 +693,23 @@ public class HRegionServer implements HR
       // Master may have sent us a new address with the other configs.
       // Update our address in this case. See HBASE-719
       String hra = conf.get("hbase.regionserver.address");
-      // TODO: The below used to be this.address != null.  Was broken by what
+      // TODO: The below used to be this.address != null. Was broken by what
       // looks like a mistake in:
       //
-      // HBASE-1215 migration; metautils scan of meta region was broken; wouldn't see first row
+      // HBASE-1215 migration; metautils scan of meta region was broken;
+      // wouldn't see first row
       // ------------------------------------------------------------------------
-      // r796326 | stack | 2009-07-21 07:40:34 -0700 (Tue, 21 Jul 2009) | 38 lines
+      // r796326 | stack | 2009-07-21 07:40:34 -0700 (Tue, 21 Jul 2009) | 38
+      // lines
       if (hra != null) {
-        HServerAddress hsa = new HServerAddress (hra,
-          this.serverInfo.getServerAddress().getPort());
-        LOG.info("Master passed us address to use. Was=" +
-          this.serverInfo.getServerAddress() + ", Now=" + hra);
+        HServerAddress hsa = new HServerAddress(hra, this.serverInfo
+            .getServerAddress().getPort());
+        LOG.info("Master passed us address to use. Was="
+            + this.serverInfo.getServerAddress() + ", Now=" + hra);
         this.serverInfo.setServerAddress(hsa);
       }
       // Master sent us hbase.rootdir to use. Should be fully qualified
-      // path with file system specification included.  Set 'fs.defaultFS'
+      // path with file system specification included. Set 'fs.defaultFS'
       // to match the filesystem on hbase.rootdir else underlying hadoop hdfs
       // accessors will be going against wrong filesystem (unless all is set
       // to defaults).
@@ -677,13 +726,15 @@ public class HRegionServer implements HR
       this.isOnline = false;
       this.stopRequested.set(true);
       throw convertThrowableToIOE(cleanup(e, "Failed init"),
-        "Region server startup failed");
+          "Region server startup failed");
     }
   }
 
   /*
    * @param r Region to get RegionLoad for.
+   *
    * @return RegionLoad instance.
+   *
    * @throws IOException
    */
   private HServerLoad.RegionLoad createRegionLoad(final HRegion r) {
@@ -691,20 +742,18 @@ public class HRegionServer implements HR
     int stores = 0;
     int storefiles = 0;
     int storefileSizeMB = 0;
-    int memstoreSizeMB = (int)(r.memstoreSize.get()/1024/1024);
+    int memstoreSizeMB = (int) (r.memstoreSize.get() / 1024 / 1024);
     int storefileIndexSizeMB = 0;
     synchronized (r.stores) {
       stores += r.stores.size();
-      for (Store store: r.stores.values()) {
+      for (Store store : r.stores.values()) {
         storefiles += store.getStorefilesCount();
-        storefileSizeMB +=
-          (int)(store.getStorefilesSize()/1024/1024);
-        storefileIndexSizeMB +=
-          (int)(store.getStorefilesIndexSize()/1024/1024);
+        storefileSizeMB += (int) (store.getStorefilesSize() / 1024 / 1024);
+        storefileIndexSizeMB += (int) (store.getStorefilesIndexSize() / 1024 / 1024);
       }
     }
     return new HServerLoad.RegionLoad(name, stores, storefiles,
-      storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB);
+        storefileSizeMB, memstoreSizeMB, storefileIndexSizeMB);
   }
 
   /**
@@ -712,14 +761,16 @@ public class HRegionServer implements HR
    * @return An instance of RegionLoad.
    * @throws IOException
    */
-  public HServerLoad.RegionLoad createRegionLoad(final byte [] regionName) {
+  public HServerLoad.RegionLoad createRegionLoad(final byte[] regionName) {
     return createRegionLoad(this.onlineRegions.get(Bytes.mapKey(regionName)));
   }
 
   /*
-   * Cleanup after Throwable caught invoking method.  Converts <code>t</code>
-   * to IOE if it isn't already.
+   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+   * IOE if it isn't already.
+   *
    * @param t Throwable
+   *
    * @return Throwable converted to an IOE; methods can only let out IOEs.
    */
   private Throwable cleanup(final Throwable t) {
@@ -727,10 +778,13 @@ public class HRegionServer implements HR
   }
 
   /*
-   * Cleanup after Throwable caught invoking method.  Converts <code>t</code>
-   * to IOE if it isn't already.
+   * Cleanup after Throwable caught invoking method. Converts <code>t</code> to
+   * IOE if it isn't already.
+   *
    * @param t Throwable
-   * @param msg Message to log in error.  Can be null.
+   *
+   * @param msg Message to log in error. Can be null.
+   *
    * @return Throwable converted to an IOE; methods can only let out IOEs.
    */
   private Throwable cleanup(final Throwable t, final String msg) {
@@ -747,6 +801,7 @@ public class HRegionServer implements HR
 
   /*
    * @param t
+   *
    * @return Make <code>t</code> an IOE if it isn't already.
    */
   private IOException convertThrowableToIOE(final Throwable t) {
@@ -755,36 +810,38 @@ public class HRegionServer implements HR
 
   /*
    * @param t
+   *
    * @param msg Message to put in new IOE if passed <code>t</code> is not an IOE
+   *
    * @return Make <code>t</code> an IOE if it isn't already.
    */
-  private IOException convertThrowableToIOE(final Throwable t,
-      final String msg) {
-    return (t instanceof IOException? (IOException)t:
-      msg == null || msg.length() == 0?
-        new IOException(t): new IOException(msg, t));
+  private IOException convertThrowableToIOE(final Throwable t, final String msg) {
+    return (t instanceof IOException ? (IOException) t : msg == null
+        || msg.length() == 0 ? new IOException(t) : new IOException(msg, t));
   }
+
   /*
    * Check if an OOME and if so, call abort.
+   *
    * @param e
+   *
    * @return True if we OOME'd and are aborting.
    */
   public boolean checkOOME(final Throwable e) {
     boolean stop = false;
-    if (e instanceof OutOfMemoryError ||
-      (e.getCause() != null && e.getCause() instanceof OutOfMemoryError) ||
-      (e.getMessage() != null &&
-        e.getMessage().contains("java.lang.OutOfMemoryError"))) {
+    if (e instanceof OutOfMemoryError
+        || (e.getCause() != null && e.getCause() instanceof OutOfMemoryError)
+        || (e.getMessage() != null && e.getMessage().contains(
+            "java.lang.OutOfMemoryError"))) {
       abort("OutOfMemoryError, aborting", e);
       stop = true;
     }
     return stop;
   }
 
-
   /**
-   * Checks to see if the file system is still accessible.
-   * If not, sets abortRequested and stopRequested
+   * Checks to see if the file system is still accessible. If not, sets
+   * abortRequested and stopRequested
    *
    * @return false if file system is not available
    */
@@ -807,8 +864,8 @@ public class HRegionServer implements HR
   private static class MajorCompactionChecker extends Chore {
     private final HRegionServer instance;
 
-    MajorCompactionChecker(final HRegionServer h,
-        final int sleepTime, final AtomicBoolean stopper) {
+    MajorCompactionChecker(final HRegionServer h, final int sleepTime,
+        final AtomicBoolean stopper) {
       super("MajorCompactionChecker", sleepTime, stopper);
       this.instance = h;
       LOG.info("Runs every " + sleepTime + "ms");
@@ -816,14 +873,12 @@ public class HRegionServer implements HR
 
     @Override
     protected void chore() {
-      Set<Integer> keys = this.instance.onlineRegions.keySet();
-      for (Integer i: keys) {
-        HRegion r = this.instance.onlineRegions.get(i);
+      for (HRegion r : this.instance.onlineRegions.values()) {
         try {
           if (r != null && r.isMajorCompaction()) {
-            // Queue a compaction.  Will recognize if major is needed.
-            this.instance.compactSplitThread.
-              compactionRequested(r, getName() + " requests major compaction");
+            // Queue a compaction. Will recognize if major is needed.
+            this.instance.compactSplitThread.compactionRequested(r, getName()
+                + " requests major compaction");
           }
         } catch (IOException e) {
           LOG.warn("Failed major compaction check on " + r, e);
@@ -833,9 +888,10 @@ public class HRegionServer implements HR
   }
 
   /**
-   * Report the status of the server. A server is online once all the startup
-   * is completed (setting up filesystem, starting service threads, etc.). This
+   * Report the status of the server. A server is online once all the startup is
+   * completed (setting up filesystem, starting service threads, etc.). This
    * method is designed mostly to be useful in tests.
+   *
    * @return true if online, false if not.
    */
   public boolean isOnline() {
@@ -849,22 +905,22 @@ public class HRegionServer implements HR
       LOG.debug("Log dir " + logdir);
     }
     if (fs.exists(logdir)) {
-      throw new RegionServerRunningException("region server already " +
-        "running at " + this.serverInfo.getServerName() +
-        " because logdir " + logdir.toString() + " exists");
+      throw new RegionServerRunningException("region server already "
+          + "running at " + this.serverInfo.getServerName()
+          + " because logdir " + logdir.toString() + " exists");
     }
     HLog newlog = instantiateHLog(logdir, oldLogDir);
     return newlog;
   }
 
   // instantiate
-  protected HLog instantiateHLog(Path logdir, Path oldLogDir) throws IOException {
+  protected HLog instantiateHLog(Path logdir, Path oldLogDir)
+      throws IOException {
     HLog newlog = new HLog(fs, logdir, oldLogDir, conf, hlogRoller, null,
         serverInfo.getServerAddress().toString());
     return newlog;
   }
 
-
   protected LogRoller getLogRoller() {
     return hlogRoller;
   }
@@ -884,19 +940,19 @@ public class HRegionServer implements HR
     this.metrics.regions.set(this.onlineRegions.size());
     this.metrics.incrementRequests(this.requestCount.get());
     // Is this too expensive every three seconds getting a lock on onlineRegions
-    // and then per store carried?  Can I make metrics be sloppier and avoid
+    // and then per store carried? Can I make metrics be sloppier and avoid
     // the synchronizations?
     int stores = 0;
     int storefiles = 0;
     long memstoreSize = 0;
     long storefileIndexSize = 0;
     synchronized (this.onlineRegions) {
-      for (Map.Entry<Integer, HRegion> e: this.onlineRegions.entrySet()) {
+      for (Map.Entry<String, HRegion> e : this.onlineRegions.entrySet()) {
         HRegion r = e.getValue();
         memstoreSize += r.memstoreSize.get();
         synchronized (r.stores) {
           stores += r.stores.size();
-          for(Map.Entry<byte [], Store> ee: r.stores.entrySet()) {
+          for (Map.Entry<byte[], Store> ee : r.stores.entrySet()) {
             Store store = ee.getValue();
             storefiles += store.getStorefilesCount();
             storefileIndexSize += store.getStorefilesIndexSize();
@@ -906,12 +962,13 @@ public class HRegionServer implements HR
     }
     this.metrics.stores.set(stores);
     this.metrics.storefiles.set(storefiles);
-    this.metrics.memstoreSizeMB.set((int)(memstoreSize/(1024*1024)));
-    this.metrics.storefileIndexSizeMB.set((int)(storefileIndexSize/(1024*1024)));
-    this.metrics.compactionQueueSize.set(compactSplitThread.
-      getCompactionQueueSize());
+    this.metrics.memstoreSizeMB.set((int) (memstoreSize / (1024 * 1024)));
+    this.metrics.storefileIndexSizeMB
+        .set((int) (storefileIndexSize / (1024 * 1024)));
+    this.metrics.compactionQueueSize.set(compactSplitThread
+        .getCompactionQueueSize());
 
-    LruBlockCache lruBlockCache = (LruBlockCache)StoreFile.getBlockCache(conf);
+    LruBlockCache lruBlockCache = (LruBlockCache) StoreFile.getBlockCache(conf);
     if (lruBlockCache != null) {
       this.metrics.blockCacheCount.set(lruBlockCache.size());
       this.metrics.blockCacheFree.set(lruBlockCache.getFreeSize());
@@ -932,14 +989,14 @@ public class HRegionServer implements HR
   /*
    * Start maintanence Threads, Server, Worker and lease checker threads.
    * Install an UncaughtExceptionHandler that calls abort of RegionServer if we
-   * get an unhandled exception.  We cannot set the handler on all threads.
-   * Server's internal Listener thread is off limits.  For Server, if an OOME,
-   * it waits a while then retries.  Meantime, a flush or a compaction that
-   * tries to run should trigger same critical condition and the shutdown will
-   * run.  On its way out, this server will shut down Server.  Leases are sort
-   * of inbetween. It has an internal thread that while it inherits from
-   * Chore, it keeps its own internal stop mechanism so needs to be stopped
-   * by this hosting server.  Worker logs the exception and exits.
+   * get an unhandled exception. We cannot set the handler on all threads.
+   * Server's internal Listener thread is off limits. For Server, if an OOME, it
+   * waits a while then retries. Meantime, a flush or a compaction that tries to
+   * run should trigger same critical condition and the shutdown will run. On
+   * its way out, this server will shut down Server. Leases are sort of
+   * inbetween. It has an internal thread that while it inherits from Chore, it
+   * keeps its own internal stop mechanism so needs to be stopped by this
+   * hosting server. Worker logs the exception and exits.
    */
   private void startServiceThreads() throws IOException {
     String n = Thread.currentThread().getName();
@@ -948,17 +1005,16 @@ public class HRegionServer implements HR
         abort("Uncaught exception in service thread " + t.getName(), e);
       }
     };
-    Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller",
-        handler);
+    Threads.setDaemonThreadRunning(this.hlogRoller, n + ".logRoller", handler);
     Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
-      handler);
+        handler);
     Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
         handler);
     Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
-    Threads.setDaemonThreadRunning(this.majorCompactionChecker,
-        n + ".majorCompactionChecker", handler);
+    Threads.setDaemonThreadRunning(this.majorCompactionChecker, n
+        + ".majorCompactionChecker", handler);
 
-    // Leases is not a Thread. Internally it runs a daemon thread.  If it gets
+    // Leases is not a Thread. Internally it runs a daemon thread. If it gets
     // an unhandled exception, it will just exit.
     this.leases.setName(n + ".leaseChecker");
     this.leases.start();
@@ -966,7 +1022,8 @@ public class HRegionServer implements HR
     int port = this.conf.getInt("hbase.regionserver.info.port", 60030);
     // -1 is for disabling info server
     if (port >= 0) {
-      String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0");
+      String addr = this.conf.get("hbase.regionserver.info.bindAddress",
+          "0.0.0.0");
       // check if auto port bind enabled
       boolean auto = this.conf.getBoolean("hbase.regionserver.info.port.auto",
           false);
@@ -977,7 +1034,7 @@ public class HRegionServer implements HR
           this.infoServer.start();
           break;
         } catch (BindException e) {
-          if (!auto){
+          if (!auto) {
             // auto bind disabled throw BindException
             throw e;
           }
@@ -986,17 +1043,17 @@ public class HRegionServer implements HR
           port++;
           // update HRS server info port.
           this.serverInfo = new HServerInfo(this.serverInfo.getServerAddress(),
-            this.serverInfo.getStartCode(),  port,
-            this.serverInfo.getHostname());
+              this.serverInfo.getStartCode(), port, this.serverInfo
+                  .getHostname());
         }
       }
     }
 
-    // Start Server.  This service is like leases in that it internally runs
+    // Start Server. This service is like leases in that it internally runs
     // a thread.
     this.server.start();
-    LOG.info("HRegionServer started at: " +
-      this.serverInfo.getServerAddress().toString());
+    LOG.info("HRegionServer started at: "
+        + this.serverInfo.getServerAddress().toString());
   }
 
   /*
@@ -1008,9 +1065,9 @@ public class HRegionServer implements HR
       return false;
     }
     // Verify that all threads are alive
-    if (!(leases.isAlive() && compactSplitThread.isAlive() &&
-        cacheFlusher.isAlive() && hlogRoller.isAlive() &&
-        workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
+    if (!(leases.isAlive() && compactSplitThread.isAlive()
+        && cacheFlusher.isAlive() && hlogRoller.isAlive()
+        && workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
       // One or more threads are no longer alive - shut down
       stop();
       return false;
@@ -1018,58 +1075,32 @@ public class HRegionServer implements HR
     return true;
   }
 
-  /*
-   * Run some housekeeping tasks.
-   */
-  private void housekeeping() {
-    // If the todo list has > 0 messages, iterate looking for open region
-    // messages. Send the master a message that we're working on its
-    // processing so it doesn't assign the region elsewhere.
-    if (this.toDo.isEmpty()) {
-      return;
-    }
-    // This iterator isn't safe if elements are gone and HRS.Worker could
-    // remove them (it already checks for null there). Goes from oldest.
-    for (ToDoEntry e: this.toDo) {
-      if(e == null) {
-        LOG.warn("toDo gave a null entry during iteration");
-        break;
-      }
-      HMsg msg = e.msg;
-      if (msg != null) {
-        if (msg.isType(HMsg.Type.MSG_REGION_OPEN)) {
-          addProcessingMessage(msg.getRegionInfo());
-        }
-      } else {
-        LOG.warn("Message is empty: " + e);
-      }
-    }
-  }
-
   /** @return the HLog */
   public HLog getLog() {
     return this.hlog;
   }
 
   /**
-   * Sets a flag that will cause all the HRegionServer threads to shut down
-   * in an orderly fashion.  Used by unit tests.
+   * Sets a flag that will cause all the HRegionServer threads to shut down in
+   * an orderly fashion. Used by unit tests.
    */
   public void stop() {
     this.stopRequested.set(true);
-    synchronized(this) {
+    synchronized (this) {
       // Wakes run() if it is sleeping
       notifyAll(); // FindBugs NN_NAKED_NOTIFY
     }
   }
 
   /**
-   * Cause the server to exit without closing the regions it is serving, the
-   * log it is using and without notifying the master.
-   * Used unit testing and on catastrophic events such as HDFS is yanked out
-   * from under hbase or we OOME.
-   * @param reason the reason we are aborting
-   * @param cause the exception that caused the abort, or null
+   * Cause the server to exit without closing the regions it is serving, the log
+   * it is using and without notifying the master. Used unit testing and on
+   * catastrophic events such as HDFS is yanked out from under hbase or we OOME.
+   *
+   * @param reason
+   *          the reason we are aborting
+   * @param cause
+   *          the exception that caused the abort, or null
    */
   public void abort(String reason, Throwable cause) {
     if (cause != null) {
@@ -1084,7 +1115,7 @@ public class HRegionServer implements HR
     }
     stop();
   }
-  
+
   /**
    * @see HRegionServer#abort(String, Throwable)
    */
@@ -1093,9 +1124,9 @@ public class HRegionServer implements HR
   }
 
   /*
-   * Simulate a kill -9 of this server.
-   * Exits w/o closing regions or cleaninup logs but it does close socket in
-   * case want to bring up server on old hostname+port immediately.
+   * Simulate a kill -9 of this server. Exits w/o closing regions or cleaninup
+   * logs but it does close socket in case want to bring up server on old
+   * hostname+port immediately.
    */
   protected void kill() {
     this.killed = true;
@@ -1103,8 +1134,8 @@ public class HRegionServer implements HR
   }
 
   /**
-   * Wait on all threads to finish.
-   * Presumption is that all closes and stops have already been called.
+   * Wait on all threads to finish. Presumption is that all closes and stops
+   * have already been called.
    */
   protected void join() {
     Threads.shutdown(this.majorCompactionChecker);
@@ -1112,20 +1143,21 @@ public class HRegionServer implements HR
     Threads.shutdown(this.cacheFlusher);
     Threads.shutdown(this.compactSplitThread);
     Threads.shutdown(this.hlogRoller);
+    HBaseExecutorService.shutdown();
   }
 
   /**
    * Get the current master from ZooKeeper and open the RPC connection to it.
-   * 
-   * Method will block until a master is available.  You can break from this
+   *
+   * Method will block until a master is available. You can break from this
    * block by requesting the server stop.
-   * 
+   *
    * @return
    */
   private boolean getMaster() {
     HServerAddress masterAddress = null;
-    while((masterAddress = masterAddressManager.getMasterAddress()) == null) {
-      if(stopRequested.get()) {
+    while ((masterAddress = masterAddressManager.getMasterAddress()) == null) {
+      if (stopRequested.get()) {
         return false;
       }
       LOG.debug("No master found, will retry");
@@ -1135,11 +1167,12 @@ public class HRegionServer implements HR
     HMasterRegionInterface master = null;
     while (!stopRequested.get() && master == null) {
       try {
-        // Do initial RPC setup.  The final argument indicates that the RPC
+        // Do initial RPC setup. The final argument indicates that the RPC
         // should retry indefinitely.
-        master = (HMasterRegionInterface)HBaseRPC.waitForProxy(
-          HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
-          masterAddress.getInetSocketAddress(), this.conf, -1, this.rpcTimeout);
+        master = (HMasterRegionInterface) HBaseRPC.waitForProxy(
+            HMasterRegionInterface.class, HBaseRPCProtocolVersion.versionID,
+            masterAddress.getInetSocketAddress(), this.conf, -1,
+            this.rpcTimeout);
       } catch (IOException e) {
         LOG.warn("Unable to connect to master. Retrying. Error was:", e);
         sleeper.sleep();
@@ -1150,8 +1183,8 @@ public class HRegionServer implements HR
   }
 
   /*
-   * Let the master know we're here
-   * Run initialization using parameters passed us by the master.
+   * Let the master know we're here Run initialization using parameters passed
+   * us by the master.
    */
   private MapWritable reportForDuty() {
     while (!stopRequested.get() && !getMaster()) {
@@ -1161,20 +1194,21 @@ public class HRegionServer implements HR
 
     MapWritable result = null;
     long lastMsg = 0;
-    while(!stopRequested.get()) {
+    while (!stopRequested.get()) {
       try {
         this.requestCount.set(0);
-        MemoryUsage memory =
-          ManagementFactory.getMemoryMXBean().getHeapMemoryUsage();
-        HServerLoad hsl = new HServerLoad(0, (int)memory.getUsed()/1024/1024,
-          (int)memory.getMax()/1024/1024);
+        MemoryUsage memory = ManagementFactory.getMemoryMXBean()
+            .getHeapMemoryUsage();
+        HServerLoad hsl = new HServerLoad(0,
+            (int) memory.getUsed() / 1024 / 1024,
+            (int) memory.getMax() / 1024 / 1024);
         this.serverInfo.setLoad(hsl);
-        if (LOG.isDebugEnabled())
+        if (LOG.isDebugEnabled()) {
           LOG.debug("sending initial server load: " + hsl);
+        }
         lastMsg = System.currentTimeMillis();
-        ZKUtil.setAddressAndWatch(zooKeeper,
-            ZKUtil.joinZNode(zooKeeper.rsZNode, ZKUtil.getNodeName(serverInfo)),
-            address);
+        ZKUtil.setAddressAndWatch(zooKeeper, ZKUtil.joinZNode(
+            zooKeeper.rsZNode, ZKUtil.getNodeName(serverInfo)), address);
         result = this.hbaseMaster.regionServerStartup(this.serverInfo);
         break;
       } catch (IOException e) {
@@ -1199,16 +1233,16 @@ public class HRegionServer implements HR
    */
   void reportSplit(HRegionInfo oldRegion, HRegionInfo newRegionA,
       HRegionInfo newRegionB) {
-    this.outboundMsgs.add(new HMsg(HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS,
-      oldRegion, newRegionA, newRegionB,
-      Bytes.toBytes("Daughters; " +
-          newRegionA.getRegionNameAsString() + ", " +
-          newRegionB.getRegionNameAsString())));
+    this.outboundMsgs.add(new HMsg(
+        HMsg.Type.MSG_REPORT_SPLIT_INCLUDES_DAUGHTERS, oldRegion, newRegionA,
+        newRegionB, Bytes.toBytes("Daughters; "
+            + newRegionA.getRegionNameAsString() + ", "
+            + newRegionB.getRegionNameAsString())));
   }
 
-  //////////////////////////////////////////////////////////////////////////////
+  // ////////////////////////////////////////////////////////////////////////////
   // HMaster-given operations
-  //////////////////////////////////////////////////////////////////////////////
+  // ////////////////////////////////////////////////////////////////////////////
 
   /*
    * Data structure to hold a HMsg and retries count.
@@ -1229,90 +1263,65 @@ public class HRegionServer implements HR
   /** Thread that performs long running requests from the master */
   class Worker implements Runnable {
     void stop() {
-      synchronized(toDo) {
+      synchronized (toDo) {
         toDo.notifyAll();
       }
     }
 
     public void run() {
       try {
-        while(!stopRequested.get()) {
+        while (!stopRequested.get()) {
           ToDoEntry e = null;
           try {
             e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-            if(e == null || stopRequested.get()) {
+            if (e == null || stopRequested.get()) {
               continue;
             }
             LOG.info("Worker: " + e.msg);
             HRegion region = null;
             HRegionInfo info = e.msg.getRegionInfo();
-            switch(e.msg.getType()) {
+            switch (e.msg.getType()) {
 
-            case MSG_REGIONSERVER_QUIESCE:
-              closeUserRegions();
-              break;
-
-            case MSG_REGION_OPEN:
-              // Open a region
-              if (!haveRootRegion.get() && !info.isRootRegion()) {
-                // root region is not online yet. requeue this task
-                LOG.info("putting region open request back into queue because" +
-                    " root region is not yet available");
-                try {
-                  toDo.put(e);
-                } catch (InterruptedException ex) {
-                  LOG.warn("insertion into toDo queue was interrupted", ex);
-                  break;
-                }
-              }
-              openRegion(info);
-              break;
+              case MSG_REGIONSERVER_QUIESCE:
+                closeUserRegions();
+                break;
 
-            case MSG_REGION_CLOSE:
-              // Close a region
-              closeRegion(e.msg.getRegionInfo(), true);
-              break;
-
-            case MSG_REGION_CLOSE_WITHOUT_REPORT:
-              // Close a region, don't reply
-              closeRegion(e.msg.getRegionInfo(), false);
-              break;
-
-            case MSG_REGION_SPLIT:
-              region = getRegion(info.getRegionName());
-              region.flushcache();
-              region.shouldSplit(true);
-              // force a compaction; split will be side-effect.
-              compactSplitThread.compactionRequested(region,
-                e.msg.getType().name());
-              break;
-
-            case MSG_REGION_MAJOR_COMPACT:
-            case MSG_REGION_COMPACT:
-              // Compact a region
-              region = getRegion(info.getRegionName());
-              compactSplitThread.compactionRequested(region,
-                e.msg.isType(Type.MSG_REGION_MAJOR_COMPACT),
-                e.msg.getType().name());
-              break;
-
-            case MSG_REGION_FLUSH:
-              region = getRegion(info.getRegionName());
-              region.flushcache();
-              break;
-
-            case TESTING_MSG_BLOCK_RS:
-              while (!stopRequested.get()) {
-                Threads.sleep(1000);
-                LOG.info("Regionserver blocked by " +
-                  HMsg.Type.TESTING_MSG_BLOCK_RS + "; " + stopRequested.get());
-              }
-              break;
+              case MSG_REGION_SPLIT:
+                region = getRegion(info.getRegionName());
+                region.flushcache();
+                region.shouldSplit(true);
+                // force a compaction; split will be side-effect.
+                compactSplitThread.compactionRequested(region, e.msg.getType()
+                    .name());
+                break;
+
+              case MSG_REGION_MAJOR_COMPACT:
+              case MSG_REGION_COMPACT:
+                // Compact a region
+                region = getRegion(info.getRegionName());
+                compactSplitThread.compactionRequested(region, e.msg
+                    .isType(Type.MSG_REGION_MAJOR_COMPACT), e.msg.getType()
+                    .name());
+                break;
+
+              case MSG_REGION_FLUSH:
+                region = getRegion(info.getRegionName());
+                region.flushcache();
+                break;
+
+              case TESTING_MSG_BLOCK_RS:
+                while (!stopRequested.get()) {
+                  Threads.sleep(1000);
+                  LOG.info("Regionserver blocked by "
+                      + HMsg.Type.TESTING_MSG_BLOCK_RS + "; "
+                      + stopRequested.get());
+                }
+                break;
 
-            default:
-              throw new AssertionError(
-                  "Impossible state during msg processing.  Instruction: "
-                  + e.msg.toString());
+              default:
+                throw new AssertionError(
+                    "Impossible state during msg processing.  Instruction: "
+                        + e.msg.toString());
             }
           } catch (InterruptedException ex) {
             LOG.warn("Processing Worker queue", ex);
@@ -1320,25 +1329,25 @@ public class HRegionServer implements HR
             if (ex instanceof IOException) {
               ex = RemoteExceptionHandler.checkIOException((IOException) ex);
             }
-            if(e != null && e.tries.get() < numRetries) {
+            if (e != null && e.tries.get() < numRetries) {
               LOG.warn(ex);
               e.tries.incrementAndGet();
               try {
                 toDo.put(e);
               } catch (InterruptedException ie) {
-                throw new RuntimeException("Putting into msgQueue was " +
-                    "interrupted.", ex);
+                throw new RuntimeException("Putting into msgQueue was "
+                    + "interrupted.", ex);
               }
             } else {
-              LOG.error("unable to process message" +
-                  (e != null ? (": " + e.msg.toString()) : ""), ex);
+              LOG.error("unable to process message"
+                  + (e != null ? (": " + e.msg.toString()) : ""), ex);
               if (!checkFileSystem()) {
                 break;
               }
             }
           }
         }
-      } catch(Throwable t) {
+      } catch (Throwable t) {
         if (!checkOOME(t)) {
           LOG.fatal("Unhandled exception", t);
         }
@@ -1348,109 +1357,32 @@ public class HRegionServer implements HR
     }
   }
 
-  void openRegion(final HRegionInfo regionInfo) {
-    Integer mapKey = Bytes.mapKey(regionInfo.getRegionName());
-    HRegion region = this.onlineRegions.get(mapKey);
-    RSZookeeperUpdater zkUpdater = 
-      new RSZookeeperUpdater(zooKeeper, serverInfo.getServerName(),
-          regionInfo.getEncodedName());
-    if (region == null) {
-      try {
-        zkUpdater.startRegionOpenEvent(null, true);
-        region = instantiateRegion(regionInfo, this.hlog);
-        // Startup a compaction early if one is needed, if region has references
-        // or if a store has too many store files
-        if (region.hasReferences() || region.hasTooManyStoreFiles()) {
-          this.compactSplitThread.compactionRequested(region,
-            region.hasReferences() ? "Region has references on open" :
-                                     "Region has too many store files");
-        }
-      } catch (Throwable e) {
-        Throwable t = cleanup(e,
-          "Error opening " + regionInfo.getRegionNameAsString());
-        // TODO: add an extra field in HRegionInfo to indicate that there is
-        // an error. We can't do that now because that would be an incompatible
-        // change that would require a migration
-        try {
-          HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, 
-                               regionInfo, 
-                               StringUtils.stringifyException(t).getBytes());
-          zkUpdater.abortOpenRegion(hmsg);
-        } catch (IOException e1) {
-          // TODO: Can we recover? Should be throw RTE?
-          LOG.error("Failed to abort open region " + regionInfo.getRegionNameAsString(), e1);
-        }
-        return;
-      }
-      this.lock.writeLock().lock();
-      try {
-        this.onlineRegions.put(mapKey, region);
-      } finally {
-        this.lock.writeLock().unlock();
-      }
-    }
-    try {
-      HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_OPEN, regionInfo);
-      zkUpdater.finishRegionOpenEvent(hmsg);
-    } catch (IOException e) {
-      LOG.error("Failed to mark region " + regionInfo.getRegionNameAsString() + " as opened", e);
-    }
-  }
-
   /*
    * @param regionInfo RegionInfo for the Region we're to instantiate and
    * initialize.
+   *
    * @param wal Set into here the regions' seqid.
+   *
+   * @param reporter periodic callback
+   *
    * @return
+   *
    * @throws IOException
    */
-  protected HRegion instantiateRegion(final HRegionInfo regionInfo, final HLog wal)
-  throws IOException {
-    Path dir =
-      HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc().getName());
+  public HRegion instantiateRegion(final HRegionInfo regionInfo,
+      final HLog wal, Progressable reporter) throws IOException {
+    Path dir = HTableDescriptor.getTableDir(rootDir, regionInfo.getTableDesc()
+        .getName());
     HRegion r = HRegion.newHRegion(dir, this.hlog, this.fs, conf, regionInfo,
-      this.cacheFlusher);
-    long seqid = r.initialize(new Progressable() {
-      public void progress() {
-        addProcessingMessage(regionInfo);
-      }
-    });
+        this.cacheFlusher);
+    long seqid = r.initialize(reporter);
     // If a wal and its seqid is < that of new region, use new regions seqid.
     if (wal != null) {
-      if (seqid > wal.getSequenceNumber()) wal.setSequenceNumber(seqid);
-    }
-    return r;
-  }
-
-  /**
-   * Add a MSG_REPORT_PROCESS_OPEN to the outbound queue.
-   * This method is called while region is in the queue of regions to process
-   * and then while the region is being opened, it is called from the Worker
-   * thread that is running the region open.
-   * @param hri Region to add the message for
-   */
-  public void addProcessingMessage(final HRegionInfo hri) {
-    getOutboundMsgs().add(new HMsg(HMsg.Type.MSG_REPORT_PROCESS_OPEN, hri));
-  }
-
-  protected void closeRegion(final HRegionInfo hri, final boolean reportWhenCompleted)
-  throws IOException {
-    RSZookeeperUpdater zkUpdater = null;
-    if(reportWhenCompleted) {
-      zkUpdater = new RSZookeeperUpdater(zooKeeper,
-          serverInfo.getServerName(), hri.getEncodedName());
-      zkUpdater.startRegionCloseEvent(null, false);
-    }
-    HRegion region = this.removeFromOnlineRegions(hri);
-    if (region != null) {
-      region.close();
-      if(reportWhenCompleted) {
-        if(zkUpdater != null) {
-          HMsg hmsg = new HMsg(HMsg.Type.MSG_REPORT_CLOSE, hri, null);
-          zkUpdater.finishRegionCloseEvent(hmsg);
-        }
+      if (seqid > wal.getSequenceNumber()) {
+        wal.setSequenceNumber(seqid);
       }
     }
+    return r;
   }
 
   /** Called either when the master tells us to restart or from stop() */
@@ -1463,21 +1395,22 @@ public class HRegionServer implements HR
     } finally {
       this.lock.writeLock().unlock();
     }
-    // Close any outstanding scanners.  Means they'll get an UnknownScanner
+    // Close any outstanding scanners. Means they'll get an UnknownScanner
     // exception next time they come in.
-    for (Map.Entry<String, InternalScanner> e: this.scanners.entrySet()) {
+    for (Map.Entry<String, InternalScanner> e : this.scanners.entrySet()) {
       try {
         e.getValue().close();
       } catch (IOException ioe) {
         LOG.warn("Closing scanner " + e.getKey(), ioe);
       }
     }
-    for (HRegion region: regionsToClose) {
+    for (HRegion region : regionsToClose) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("closing region " + Bytes.toString(region.getRegionName()));
       }
       try {
-        region.close(abortRequested);
+        new CloseRegionHandler(this, region.getRegionInfo(), abortRequested)
+            .execute();
       } catch (Throwable e) {
         cleanup(e, "Error closing " + Bytes.toString(region.getRegionName()));
       }
@@ -1505,7 +1438,7 @@ public class HRegionServer implements HR
         r.close();
       } catch (Throwable e) {
         LOG.error("Error closing region " + r.toString(),
-          RemoteExceptionHandler.checkThrowable(e));
+            RemoteExceptionHandler.checkThrowable(e));
       }
     }
   }
@@ -1516,9 +1449,9 @@ public class HRegionServer implements HR
     this.lock.writeLock().lock();
     try {
       synchronized (onlineRegions) {
-        for (Iterator<Map.Entry<Integer, HRegion>> i =
-            onlineRegions.entrySet().iterator(); i.hasNext();) {
-          Map.Entry<Integer, HRegion> e = i.next();
+        for (Iterator<Map.Entry<String, HRegion>> i = onlineRegions.entrySet()
+            .iterator(); i.hasNext();) {
+          Map.Entry<String, HRegion> e = i.next();
           HRegion r = e.getValue();
           if (!r.getRegionInfo().isMetaRegion()) {
             regionsToClose.add(r);
@@ -1560,16 +1493,14 @@ public class HRegionServer implements HR
   // HRegionInterface
   //
 
-  public HRegionInfo getRegionInfo(final byte [] regionName)
-  throws NotServingRegionException {
+  public HRegionInfo getRegionInfo(final byte[] regionName)
+      throws NotServingRegionException {
     requestCount.incrementAndGet();
     return getRegion(regionName).getRegionInfo();
   }
 
-
-  public Result getClosestRowBefore(final byte [] regionName,
-    final byte [] row, final byte [] family)
-  throws IOException {
+  public Result getClosestRowBefore(final byte[] regionName, final byte[] row,
+      final byte[] family) throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
@@ -1585,33 +1516,33 @@ public class HRegionServer implements HR
   }
 
   /** {@inheritDoc} */
-  public Result get(byte [] regionName, Get get) throws IOException {
+  public Result get(byte[] regionName, Get get) throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
       return region.get(get, getLockFromId(get.getLockId()));
-    } catch(Throwable t) {
+    } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  public boolean exists(byte [] regionName, Get get) throws IOException {
+  public boolean exists(byte[] regionName, Get get) throws IOException {
     checkOpen();
     requestCount.incrementAndGet();
     try {
       HRegion region = getRegion(regionName);
       Result r = region.get(get, getLockFromId(get.getLockId()));
       return r != null && !r.isEmpty();
-    } catch(Throwable t) {
+    } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-  public void put(final byte [] regionName, final Put put)
-  throws IOException {
-    if (put.getRow() == null)
+  public void put(final byte[] regionName, final Put put) throws IOException {
+    if (put.getRow() == null) {
       throw new IllegalArgumentException("update has null row");
+    }
 
     checkOpen();
     this.requestCount.incrementAndGet();
@@ -1628,7 +1559,7 @@ public class HRegionServer implements HR
   }
 
   public int put(final byte[] regionName, final List<Put> puts)
-  throws IOException {
+      throws IOException {
     checkOpen();
     HRegion region = null;
     try {
@@ -1636,21 +1567,22 @@ public class HRegionServer implements HR
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
-      
+
       @SuppressWarnings("unchecked")
       Pair<Put, Integer>[] putsWithLocks = new Pair[puts.size()];
-      
+
       int i = 0;
       for (Put p : puts) {
         Integer lock = getLockFromId(p.getLockId());
         putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
       }
-      
+
       this.requestCount.addAndGet(puts.size());
       OperationStatusCode[] codes = region.put(putsWithLocks);
       for (i = 0; i < codes.length; i++) {
-        if (codes[i] != OperationStatusCode.SUCCESS)
+        if (codes[i] != OperationStatusCode.SUCCESS) {
           return i;
+        }
       }
       return -1;
     } catch (Throwable t) {
@@ -1658,8 +1590,8 @@ public class HRegionServer implements HR
     }
   }
 
-  private boolean checkAndMutate(final byte[] regionName, final byte [] row,
-      final byte [] family, final byte [] qualifier, final byte [] value,
+  private boolean checkAndMutate(final byte[] regionName, final byte[] row,
+      final byte[] family, final byte[] qualifier, final byte[] value,
       final Writable w, Integer lock) throws IOException {
     checkOpen();
     this.requestCount.incrementAndGet();
@@ -1668,28 +1600,28 @@ public class HRegionServer implements HR
       if (!region.getRegionInfo().isMetaTable()) {
         this.cacheFlusher.reclaimMemStoreMemory();
       }
-      return region.checkAndMutate(row, family, qualifier, value, w, lock,
-          true);
+      return region
+          .checkAndMutate(row, family, qualifier, value, w, lock, true);
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
-
   /**
    *
    * @param regionName
    * @param row
    * @param family
    * @param qualifier
-   * @param value the expected value
+   * @param value
+   *          the expected value
    * @param put
    * @throws IOException
    * @return true if the new put was execute, false otherwise
    */
-  public boolean checkAndPut(final byte[] regionName, final byte [] row,
-      final byte [] family, final byte [] qualifier, final byte [] value,
-      final Put put) throws IOException{
+  public boolean checkAndPut(final byte[] regionName, final byte[] row,
+      final byte[] family, final byte[] qualifier, final byte[] value,
+      final Put put) throws IOException {
     return checkAndMutate(regionName, row, family, qualifier, value, put,
         getLockFromId(put.getLockId()));
   }
@@ -1700,14 +1632,15 @@ public class HRegionServer implements HR
    * @param row
    * @param family
    * @param qualifier
-   * @param value the expected value
+   * @param value
+   *          the expected value
    * @param delete
    * @throws IOException
    * @return true if the new put was execute, false otherwise
    */
-  public boolean checkAndDelete(final byte[] regionName, final byte [] row,
-      final byte [] family, final byte [] qualifier, final byte [] value,
-      final Delete delete) throws IOException{
+  public boolean checkAndDelete(final byte[] regionName, final byte[] row,
+      final byte[] family, final byte[] qualifier, final byte[] value,
+      final Delete delete) throws IOException {
     return checkAndMutate(regionName, row, family, qualifier, value, delete,
         getLockFromId(delete.getLockId()));
   }
@@ -1716,8 +1649,7 @@ public class HRegionServer implements HR
   // remote scanner interface
   //
 
-  public long openScanner(byte [] regionName, Scan scan)
-  throws IOException {
+  public long openScanner(byte[] regionName, Scan scan) throws IOException {
     checkOpen();
     NullPointerException npe = null;
     if (regionName == null) {
@@ -1742,20 +1674,19 @@ public class HRegionServer implements HR
     scannerId = rand.nextLong();
     String scannerName = String.valueOf(scannerId);
     scanners.put(scannerName, s);
-    this.leases.
-      createLease(scannerName, new ScannerListener(scannerName));
+    this.leases.createLease(scannerName, new ScannerListener(scannerName));
     return scannerId;
   }
 
   public Result next(final long scannerId) throws IOException {
-    Result [] res = next(scannerId, 1);
-    if(res == null || res.length == 0) {
+    Result[] res = next(scannerId, 1);
+    if (res == null || res.length == 0) {
       return null;
     }
     return res[0];
   }
 
-  public Result [] next(final long scannerId, int nbRows) throws IOException {
+  public Result[] next(final long scannerId, int nbRows) throws IOException {
     try {
       String scannerName = String.valueOf(scannerId);
       InternalScanner s = this.scanners.get(scannerName);
@@ -1774,7 +1705,8 @@ public class HRegionServer implements HR
       List<Result> results = new ArrayList<Result>(nbRows);
       long currentScanResultSize = 0;
       List<KeyValue> values = new ArrayList<KeyValue>();
-      for (int i = 0; i < nbRows && currentScanResultSize < maxScannerResultSize; i++) {
+      for (int i = 0; i < nbRows
+          && currentScanResultSize < maxScannerResultSize; i++) {
         requestCount.incrementAndGet();
         // Collect values to be returned here
         boolean moreRows = s.next(values);
@@ -1790,13 +1722,13 @@ public class HRegionServer implements HR
         values.clear();
       }
       // Below is an ugly hack where we cast the InternalScanner to be a
-      // HRegion.RegionScanner.  The alternative is to change InternalScanner
+      // HRegion.RegionScanner. The alternative is to change InternalScanner
       // interface but its used everywhere whereas we just need a bit of info
       // from HRegion.RegionScanner, IF its filter if any is done with the scan
-      // and wants to tell the client to stop the scan.  This is done by passing
+      // and wants to tell the client to stop the scan. This is done by passing
       // a null result.
-      return ((HRegion.RegionScanner)s).isFilterDone() && results.isEmpty()?
-        null: results.toArray(new Result[0]);
+      return ((HRegion.RegionScanner) s).isFilterDone() && results.isEmpty() ? null
+          : results.toArray(new Result[0]);
     } catch (Throwable t) {
       if (t instanceof NotServingRegionException) {
         String scannerName = String.valueOf(scannerId);
@@ -1822,8 +1754,8 @@ public class HRegionServer implements HR
   }
 
   /**
-   * Instantiated as a scanner lease.
-   * If the lease times out, the scanner is closed
+   * Instantiated as a scanner lease. If the lease times out, the scanner is
+   * closed
    */
   private class ScannerListener implements LeaseListener {
     private final String scannerName;
@@ -1848,8 +1780,8 @@ public class HRegionServer implements HR
   //
   // Methods that do the actual work for the remote API
   //
-  public void delete(final byte [] regionName, final Delete delete)
-  throws IOException {
+  public void delete(final byte[] regionName, final Delete delete)
+      throws IOException {
     checkOpen();
     try {
       boolean writeToWAL = true;
@@ -1866,7 +1798,7 @@ public class HRegionServer implements HR
   }
 
   public int delete(final byte[] regionName, final List<Delete> deletes)
-  throws IOException {
+      throws IOException {
     // Count of Deletes processed.
     int i = 0;
     checkOpen();
@@ -1879,7 +1811,7 @@ public class HRegionServer implements HR
       }
       int size = deletes.size();
       Integer[] locks = new Integer[size];
-      for (Delete delete: deletes) {
+      for (Delete delete : deletes) {
         this.requestCount.incrementAndGet();
         locks[i] = getLockFromId(delete.getLockId());
         region.delete(delete, locks[i], writeToWAL);
@@ -1896,16 +1828,15 @@ public class HRegionServer implements HR
     return -1;
   }
 
-  public long lockRow(byte [] regionName, byte [] row)
-  throws IOException {
+  public long lockRow(byte[] regionName, byte[] row) throws IOException {
     checkOpen();
     NullPointerException npe = null;
-    if(regionName == null) {
+    if (regionName == null) {
       npe = new NullPointerException("regionName is null");
-    } else if(row == null) {
+    } else if (row == null) {
       npe = new NullPointerException("row to lock is null");
     }
-    if(npe != null) {
+    if (npe != null) {
       IOException io = new IOException("Invalid arguments to lockRow");
       io.initCause(npe);
       throw io;
@@ -1914,34 +1845,36 @@ public class HRegionServer implements HR
     try {
       HRegion region = getRegion(regionName);
       Integer r = region.obtainRowLock(row);
-      long lockId = addRowLock(r,region);
+      long lockId = addRowLock(r, region);
       LOG.debug("Row lock " + lockId + " explicitly acquired by client");
       return lockId;
     } catch (Throwable t) {
-      throw convertThrowableToIOE(cleanup(t,
-        "Error obtaining row lock (fsOk: " + this.fsOk + ")"));
+      throw convertThrowableToIOE(cleanup(t, "Error obtaining row lock (fsOk: "
+          + this.fsOk + ")"));
     }
   }
 
-  protected long addRowLock(Integer r, HRegion region) throws LeaseStillHeldException {
+  protected long addRowLock(Integer r, HRegion region)
+      throws LeaseStillHeldException {
     long lockId = -1L;
     lockId = rand.nextLong();
     String lockName = String.valueOf(lockId);
     rowlocks.put(lockName, r);
-    this.leases.
-      createLease(lockName, new RowLockListener(lockName, region));
+    this.leases.createLease(lockName, new RowLockListener(lockName, region));
     return lockId;
   }
 
   /**
-   * Method to get the Integer lock identifier used internally
-   * from the long lock identifier used by the client.
-   * @param lockId long row lock identifier from client
+   * Method to get the Integer lock identifier used internally from the long
+   * lock identifier used by the client.
+   *
+   * @param lockId
+   *          long row lock identifier from client
    * @return intId Integer row lock used internally in HRegion
-   * @throws IOException Thrown if this is not a valid client lock id.
+   * @throws IOException
+   *           Thrown if this is not a valid client lock id.
    */
-  Integer getLockFromId(long lockId)
-  throws IOException {
+  Integer getLockFromId(long lockId) throws IOException {
     if (lockId == -1L) {
       return null;
     }
@@ -1954,16 +1887,15 @@ public class HRegionServer implements HR
     return rl;
   }
 
-  public void unlockRow(byte [] regionName, long lockId)
-  throws IOException {
+  public void unlockRow(byte[] regionName, long lockId) throws IOException {
     checkOpen();
     NullPointerException npe = null;
-    if(regionName == null) {
+    if (regionName == null) {
       npe = new NullPointerException("regionName is null");
-    } else if(lockId == -1L) {
+    } else if (lockId == -1L) {
       npe = new NullPointerException("lockId is null");
     }
-    if(npe != null) {
+    if (npe != null) {
       IOException io = new IOException("Invalid arguments to unlockRow");
       io.initCause(npe);
       throw io;
@@ -1973,31 +1905,30 @@ public class HRegionServer implements HR
       HRegion region = getRegion(regionName);
       String lockName = String.valueOf(lockId);
       Integer r = rowlocks.remove(lockName);
-      if(r == null) {
+      if (r == null) {
         throw new UnknownRowLockException(lockName);
       }
       region.releaseRowLock(r);
       this.leases.cancelLease(lockName);
-      LOG.debug("Row lock " + lockId + " has been explicitly released by client");
+      LOG.debug("Row lock " + lockId
+          + " has been explicitly released by client");
     } catch (Throwable t) {
       throw convertThrowableToIOE(cleanup(t));
     }
   }
 
   @Override
-  public void bulkLoadHFile(
-      String hfilePath, byte[] regionName, byte[] familyName)
-  throws IOException {
+  public void bulkLoadHFile(String hfilePath, byte[] regionName,
+      byte[] familyName) throws IOException {
     HRegion region = getRegion(regionName);
     region.bulkLoadHFile(hfilePath, familyName);
   }
 
-  Map<String, Integer> rowlocks =
-    new ConcurrentHashMap<String, Integer>();
+  Map<String, Integer> rowlocks = new ConcurrentHashMap<String, Integer>();
 
   /**
-   * Instantiated as a row lock lease.
-   * If the lease times out, the row lock is released
+   * Instantiated as a row lock lease. If the lease times out, the row lock is
+   * released
    */
   private class RowLockListener implements LeaseListener {
     private final String lockName;
@@ -2011,12 +1942,80 @@ public class HRegionServer implements HR
     public void leaseExpired() {
       LOG.info("Row Lock " + this.lockName + " lease expired");
       Integer r = rowlocks.remove(this.lockName);
-      if(r != null) {
+      if (r != null) {
         region.releaseRowLock(r);
       }
     }
   }
 
+  // Region open/close direct RPCs
+
+  @Override
+  public void openRegion(HRegionInfo region) {
+    LOG.info("Received request to open region: "
+        + region.getRegionNameAsString());
+    if(region.isRootRegion()) {
+      new OpenRootHandler(this, catalogTracker, region).submit();
+    } else if(region.isMetaRegion()) {
+      new OpenMetaHandler(this, catalogTracker, region).submit();
+    } else {
+      new OpenRegionHandler(this, catalogTracker, region).submit();
+    }
+  }
+
+  @Override
+  public boolean closeRegion(HRegionInfo region)
+      throws NotServingRegionException {
+    LOG.info("Received request to close region: "
+        + region.getRegionNameAsString());
+    // TODO: Need to check if this is being served here but currently undergoing
+    // a split (so master needs to retry close after split is complete)
+    if (!onlineRegions.containsKey(region.getEncodedName())) {
+      LOG.warn("Received close for region we are not serving");
+      throw new NotServingRegionException("Received close for "
+          + region.getRegionNameAsString() + " but we are not serving it");
+    }
+    if(region.isRootRegion()) {
+      new CloseRootHandler(this, region).submit();
+    } else if(region.isMetaRegion()) {
+      new CloseMetaHandler(this, region).submit();
+    } else {
+      new CloseRegionHandler(this, region).submit();
+    }
+    return true;
+  }
+
+  // Manual remote region administration RPCs
+
+  @Override
+  public void flushRegion(HRegionInfo regionInfo)
+      throws NotServingRegionException, IOException {
+    HRegion region = getRegion(regionInfo.getRegionName());
+    region.flushcache();
+  }
+
+  @Override
+  public void splitRegion(HRegionInfo regionInfo)
+      throws NotServingRegionException, IOException {
+    HRegion region = getRegion(regionInfo.getRegionName());
+    region.flushcache();
+    region.shouldSplit(true);
+    // force a compaction, split will be side-effect
+    // TODO: flush/compact/split refactor will make it trivial to do this
+    // sync/async (and won't require us to do a compaction to split!)
+    compactSplitThread.compactionRequested(region, "User-triggered split");
+  }
+
+  @Override
+  public void compactRegion(HRegionInfo regionInfo, boolean major)
+      throws NotServingRegionException, IOException {
+    HRegion region = getRegion(regionInfo.getRegionName());
+    region.flushcache();
+    region.shouldSplit(true);
+    compactSplitThread.compactionRequested(region, major, "User-triggered "
+        + (major ? "major " : "") + "compaction");
+  }
+
   /** @return the info server */
   public InfoServer getInfoServer() {
     return infoServer;
@@ -2049,7 +2048,7 @@ public class HRegionServer implements HR
     return Collections.unmodifiableCollection(onlineRegions.values());
   }
 
-  public HRegion [] getOnlineRegionsAsArray() {
+  public HRegion[] getOnlineRegionsAsArray() {
     return getOnlineRegions().toArray(new HRegion[0]);
   }
 
@@ -2058,21 +2057,34 @@ public class HRegionServer implements HR
    */
   public SortedSet<HRegionInfo> getSortedOnlineRegionInfos() {
     SortedSet<HRegionInfo> result = new TreeSet<HRegionInfo>();
-    synchronized(this.onlineRegions) {
-      for (HRegion r: this.onlineRegions.values()) {
+    synchronized (this.onlineRegions) {
+      for (HRegion r : this.onlineRegions.values()) {
         result.add(r.getRegionInfo());
       }
     }
     return result;
   }
 
+  @Override
+  public void addToOnlineRegions(HRegion region) {
+    lock.writeLock().lock();
+    try {
+      onlineRegions.put(region.getRegionInfo().getEncodedName(), region);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
   /**
-   * This method removes HRegion corresponding to hri from the Map of onlineRegions.
+   * This method removes HRegion corresponding to hri from the Map of
+   * onlineRegions.
    *
-   * @param hri the HRegionInfo corresponding to the HRegion to-be-removed.
-   * @return the removed HRegion, or null if the HRegion was not in onlineRegions.
+   * @param hri
+   *          the HRegionInfo corresponding to the HRegion to-be-removed.
+   * @return the removed HRegion, or null if the HRegion was not in
+   *         onlineRegions.
    */
-  HRegion removeFromOnlineRegions(HRegionInfo hri) {
+  public HRegion removeFromOnlineRegions(HRegionInfo hri) {
     this.lock.writeLock().lock();
     HRegion toReturn = null;
     try {
@@ -2085,7 +2097,7 @@ public class HRegionServer implements HR
 
   /**
    * @return A new Map of online regions sorted by region size with the first
-   * entry being the biggest.
+   *         entry being the biggest.
    */
   public SortedMap<Long, HRegion> getCopyOfOnlineRegionsSortedBySize() {
     // we'll sort the regions in reverse
@@ -2106,11 +2118,20 @@ public class HRegionServer implements HR
 
   /**
    * @param regionName
-   * @return HRegion for the passed <code>regionName</code> or null if named
-   * region is not member of the online regions.
+   * @return HRegion for the passed encoded <code>regionName</code> or null if
+   *         named region is not member of the online regions.
+   */
+  public HRegion getOnlineRegion(final String encodedRegionName) {
+    return onlineRegions.get(encodedRegionName);
+  }
+
+  /**
+   * @param regionName
+   * @return HRegion for the passed binary <code>regionName</code> or null if
+   *         named region is not member of the online regions.
    */
-  public HRegion getOnlineRegion(final byte [] regionName) {
-    return onlineRegions.get(Bytes.mapKey(regionName));
+  public HRegion getOnlineRegion(final byte[] regionName) {
+    return getOnlineRegion(HRegionInfo.encodeRegionName(regionName));
   }
 
   /** @return the request count */
@@ -2125,16 +2146,18 @@ public class HRegionServer implements HR
 
   /**
    * Protected utility method for safely obtaining an HRegion handle.
-   * @param regionName Name of online {@link HRegion} to return
+   *
+   * @param regionName
+   *          Name of online {@link HRegion} to return
    * @return {@link HRegion} for <code>regionName</code>
    * @throws NotServingRegionException
    */
-  protected HRegion getRegion(final byte [] regionName)
-  throws NotServingRegionException {
+  protected HRegion getRegion(final byte[] regionName)
+      throws NotServingRegionException {
     HRegion region = null;
     this.lock.readLock().lock();
     try {
-      region = onlineRegions.get(Integer.valueOf(Bytes.hashCode(regionName)));
+      region = getOnlineRegion(regionName);
       if (region == null) {
         throw new NotServingRegionException(regionName);
       }

[... 286 lines stripped ...]


Mime
View raw message