hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r575928 [1/2] - in /lucene/hadoop/trunk/src/contrib/hbase: ./ bin/ src/java/org/apache/hadoop/hbase/ src/test/org/apache/hadoop/hbase/
Date Sat, 15 Sep 2007 15:14:55 GMT
Author: stack
Date: Sat Sep 15 08:14:53 2007
New Revision: 575928

URL: http://svn.apache.org/viewvc?rev=575928&view=rev
Log:
HADOOP-1813 OOME makes zombie of region server

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/bin/hbase
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/Leases.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/RemoteExceptionHandler.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MultiRegionTable.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestCleanRegionServerExit.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Sep 15 08:14:53 2007
@@ -38,6 +38,7 @@
                 (Ning Li via Stack)
     HADOOP-1800 output should default utf8 encoding
     HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
+    HADOOP-1813 OOME makes zombie of region server
     HADOOP-1814	TestCleanRegionServerExit fails too often on Hudson
     HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
     HADOOP-1832 listTables() returns duplicate tables

Modified: lucene/hadoop/trunk/src/contrib/hbase/bin/hbase
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/bin/hbase?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/bin/hbase (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/bin/hbase Sat Sep 15 08:14:53 2007
@@ -206,7 +206,11 @@
   CLASS=$COMMAND
 fi
 
-
+# Have JVM dump heap if we run out of memory.  Files will be 'launch directory'
+# and are named like the following: java_pid21612.hprof. Apparently it doesn't
+# 'cost' to have this flag enabled. Its a 1.6 flag only. See:
+# http://blogs.sun.com/alanb/entry/outofmemoryerror_looks_a_bit_better 
+HBASE_OPTS="$HBASE_OPTS -XX:+HeapDumpOnOutOfMemoryError"
 HBASE_OPTS="$HBASE_OPTS -Dhadoop.log.dir=$HADOOP_LOG_DIR"
 HBASE_OPTS="$HBASE_OPTS -Dhadoop.log.file=$HADOOP_LOGFILE"
 HBASE_OPTS="$HBASE_OPTS -Dhadoop.home.dir=$HADOOP_HOME"

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HLog.java Sat Sep 15 08:14:53 2007
@@ -219,7 +219,6 @@
         // cache-flush.  Otherwise, the log sequence number for
         // the CACHEFLUSH operation will appear in a "newer" log file
         // than it should.
-        
         while(insideCacheFlush) {
           try {
             wait();
@@ -402,14 +401,14 @@
    * @see #completeCacheFlush(Text, Text, long)
    */
   synchronized long startCacheFlush() {
-    while (insideCacheFlush) {
+    while (this.insideCacheFlush) {
       try {
         wait();
       } catch (InterruptedException ie) {
         // continue
       }
     }
-    insideCacheFlush = true;
+    this.insideCacheFlush = true;
     notifyAll();
     return obtainSeqNum();
   }
@@ -427,7 +426,7 @@
       return;
     }
     
-    if(! insideCacheFlush) {
+    if (!this.insideCacheFlush) {
       throw new IOException("Impossible situation: inside " +
         "completeCacheFlush(), but 'insideCacheFlush' flag is false");
     }
@@ -442,6 +441,16 @@
     regionToLastFlush.put(regionName, logSeqId);
 
     insideCacheFlush = false;
+    notifyAll();
+  }
+  
+  /**
+   * Abort a cache flush.
+   * This method will clear waits on {@link #insideCacheFlush} but if this
+   * method is called, we are losing data.  TODO: Fix.
+   */
+  synchronized void abort() {
+    this.insideCacheFlush = false;
     notifyAll();
   }
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Sat Sep 15 08:14:53 2007
@@ -21,6 +21,8 @@
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.lang.reflect.Constructor;
+import java.net.ConnectException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,6 +38,7 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
@@ -57,6 +60,8 @@
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.util.Writables;
 
 
@@ -64,33 +69,33 @@
  * HMaster is the "master server" for a HBase.
  * There is only one HMaster for a single HBase deployment.
  */
-public class HMaster implements HConstants, HMasterInterface, 
-HMasterRegionInterface, Runnable {
+public class HMaster extends Thread implements HConstants, HMasterInterface, 
+HMasterRegionInterface {
+  static final Log LOG = LogFactory.getLog(HMaster.class.getName());
 
-  /** {@inheritDoc} */
   public long getProtocolVersion(String protocol,
-      @SuppressWarnings("unused") long clientVersion) throws IOException {
-
+      @SuppressWarnings("unused") long clientVersion)
+  throws IOException {
     if (protocol.equals(HMasterInterface.class.getName())) {
       return HMasterInterface.versionID; 
-
     } else if (protocol.equals(HMasterRegionInterface.class.getName())) {
       return HMasterRegionInterface.versionID;
-
     } else {
       throw new IOException("Unknown protocol to name node: " + protocol);
     }
   }
 
-  static final Log LOG = LogFactory.getLog(HMaster.class.getName());
-
-  volatile boolean closed;
+  // We start out with closed flag on.  Using AtomicBoolean rather than
+  // plain boolean because want to pass a reference to supporting threads
+  // started here in HMaster rather than have them have to know about the
+  // hosting class
+  volatile AtomicBoolean closed = new AtomicBoolean(true);
   volatile boolean fsOk;
   Path dir;
   Configuration conf;
   FileSystem fs;
   Random rand;
-  long threadWakeFrequency; 
+  int threadWakeFrequency; 
   int numRetries;
   long maxRegionOpenTime;
 
@@ -102,12 +107,15 @@
 
   HConnection connection;
 
-  long metaRescanInterval;
+  int metaRescanInterval;
 
   final AtomicReference<HServerAddress> rootRegionLocation =
     new AtomicReference<HServerAddress>();
   
   Lock splitLogLock = new ReentrantLock();
+  
+  // A Sleeper that sleeps for threadWakeFrequency
+  protected Sleeper sleeper;
 
   /**
    * Base HRegion scanner class. Holds utilty common to <code>ROOT</code> and
@@ -156,31 +164,28 @@
    * <p>A <code>META</code> region is not 'online' until it has been scanned
    * once.
    */
-  abstract class BaseScanner implements Runnable {
+  abstract class BaseScanner extends Chore {
     protected boolean rootRegion;
     protected final Text tableName;
 
     protected abstract void initialScan();
     protected abstract void maintenanceScan();
 
-    BaseScanner(final Text tableName) {
-      super();
+    BaseScanner(final Text tableName, final int period,
+        final AtomicBoolean stop) {
+      super(period, stop);
       this.tableName = tableName;
       this.rootRegion = tableName.equals(ROOT_TABLE_NAME);
     }
-
-    /** {@inheritDoc} */
-    public void run() {
+    
+    @Override
+    protected void initialChore() {
       initialScan();
-      while (!closed) {
-        try {
-          Thread.sleep(metaRescanInterval);
-        } catch (InterruptedException e) {
-          continue;
-        }
-        maintenanceScan();
-      }
-      LOG.info(this.getClass().getSimpleName() + " exiting");
+    }
+    
+    @Override
+    protected void chore() {
+      maintenanceScan();
     }
 
     /**
@@ -228,7 +233,6 @@
 
           // Note Region has been assigned.
           checkAssigned(info, serverName, startCode);
-
           if (isSplitParent(info)) {
             splitParents.put(info, results);
           }
@@ -237,11 +241,9 @@
         if (rootRegion) {
           numberOfMetaRegions.set(numberOfRegionsFound);
         }
-
       } catch (IOException e) {
         if (e instanceof RemoteException) {
           e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
           if (e instanceof UnknownScannerException) {
             // Reset scannerId so we do not try closing a scanner the other side
             // has lost account of: prevents duplicated stack trace out of the 
@@ -250,18 +252,14 @@
           }
         }
         throw e;
-
       } finally {
         try {
           if (scannerId != -1L && regionServer != null) {
             regionServer.close(scannerId);
           }
         } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) e);
-          }
-          LOG.error("Closing scanner", e);
+          LOG.error("Closing scanner",
+            RemoteExceptionHandler.checkIOException(e));
         }
       }
 
@@ -468,18 +466,17 @@
   class RootScanner extends BaseScanner {
     /** Constructor */
     public RootScanner() {
-      super(HConstants.ROOT_TABLE_NAME);
+      super(HConstants.ROOT_TABLE_NAME, metaRescanInterval, closed);
     }
 
     private void scanRoot() {
       int tries = 0;
-      while (!closed && tries < numRetries) {
+      while (!closed.get() && tries < numRetries) {
         synchronized (rootRegionLocation) {
-          while(!closed && rootRegionLocation.get() == null) {
+          while(!closed.get() && rootRegionLocation.get() == null) {
             // rootRegionLocation will be filled in when we get an 'open region'
             // regionServerReport message from the HRegionServer that has been
             // allocated the ROOT region below.
-            
             try {
               rootRegionLocation.wait();
             } catch (InterruptedException e) {
@@ -487,7 +484,7 @@
             }
           }
         }
-        if (closed) {
+        if (closed.get()) {
           continue;
         }
 
@@ -499,24 +496,17 @@
           }
           break;
         } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            try {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-            } catch (IOException ex) {
-              e = ex;
-            }
-          }
+          e = RemoteExceptionHandler.checkIOException(e);
           tries += 1;
           if (tries == 1) {
             LOG.warn("Scan ROOT region", e);
           } else {
             LOG.error("Scan ROOT region", e);
-            
             if (tries == numRetries - 1) {
               // We ran out of tries. Make sure the file system is still available
-
-              checkFileSystem();
+              if (checkFileSystem()) {
+                continue; // Avoid sleeping.
+              }
             }
           }
         } catch (Exception e) {
@@ -524,16 +514,7 @@
           // at least log it rather than go out silently.
           LOG.error("Unexpected exception", e);
         }
-        
-        if (!closed) {
-          // sleep before retry
-
-          try {
-            Thread.sleep(threadWakeFrequency);
-          } catch (InterruptedException e) {
-            // continue
-          }
-        }
+        sleeper.sleep();
       }      
     }
 
@@ -549,8 +530,7 @@
     }
   }
 
-  private RootScanner rootScanner;
-  private Thread rootScannerThread;
+  private RootScanner rootScannerThread;
   Integer rootScannerLock = new Integer(0);
 
   @SuppressWarnings("unchecked")
@@ -643,20 +623,17 @@
   class MetaScanner extends BaseScanner {
     /** Constructor */
     public MetaScanner() {
-      super(HConstants.META_TABLE_NAME);
+      super(HConstants.META_TABLE_NAME, metaRescanInterval, closed);
     }
 
     private void scanOneMetaRegion(MetaRegion region) {
       int tries = 0;
-      while (!closed && tries < numRetries) {
-        while (!closed && !rootScanned && rootRegionLocation.get() == null) {
-          try {
-            Thread.sleep(threadWakeFrequency);
-          } catch (InterruptedException e) {
-            // continue
-          }
+      while (!closed.get() && tries < numRetries) {
+        while (!closed.get() && !rootScanned &&
+            rootRegionLocation.get() == null) {
+          sleeper.sleep();
         }
-        if (closed) {
+        if (closed.get()) {
           continue;
         }
 
@@ -668,24 +645,23 @@
           }
           break;
         } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            try {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-            } catch (IOException ex) {
-              e = ex;
-            }
-          }
+          e = RemoteExceptionHandler.checkIOException(e);
           tries += 1;
           if (tries == 1) {
             LOG.warn("Scan one META region", e);
           } else {
             LOG.error("Scan one META region", e);
-            
             if (tries == numRetries - 1) {
-              // We ran out of tries. Make sure the file system is still available
-
-              checkFileSystem();
+              // We ran out of tries. Make sure the file system is still
+              // available
+              if (checkFileSystem()) {
+                // If filesystem is OK, is the exception a ConnectionException?
+                // If so, mark the server as down.  No point scanning either
+                // if no server to put meta region on.
+                if (e instanceof ConnectException) {
+                  LOG.debug("Region hosting server is gone.");
+                }
+              }
             }
           }
         } catch (Exception e) {
@@ -693,21 +669,14 @@
           // at least log it rather than go out silently.
           LOG.error("Unexpected exception", e);
         }
-        if (!closed) {
-          // sleep before retry
-          try {
-            Thread.sleep(threadWakeFrequency);                  
-          } catch (InterruptedException e) {
-            //continue
-          }
-        }
+        sleeper.sleep();
       }
     }
 
     @Override
     protected void initialScan() {
       MetaRegion region = null;
-      while (!closed && region == null && !metaRegionsScanned()) {
+      while (!closed.get() && region == null && !metaRegionsScanned()) {
         try {
           region =
             metaRegionsToScan.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
@@ -752,10 +721,9 @@
      * been scanned.
      */
     synchronized boolean waitForMetaRegionsOrClose() {
-      while (!closed) {
+      while (!closed.get()) {
         if (rootScanned &&
             numberOfMetaRegions.get() == onlineMetaRegions.size()) {
-
           break;
         }
 
@@ -765,12 +733,11 @@
           // continue
         }
       }
-      return closed;
+      return closed.get();
     }
   }
 
-  MetaScanner metaScanner;
-  private Thread metaScannerThread;
+  MetaScanner metaScannerThread;
   Integer metaScannerLock = new Integer(0);
 
   /**
@@ -840,16 +807,14 @@
 
   /** 
    * Build the HMaster
-   * @param dir         - base directory
-   * @param address     - server address and port number
-   * @param conf        - configuration
+   * @param dir base directory
+   * @param address server address and port number
+   * @param conf configuration
    * 
    * @throws IOException
    */
   public HMaster(Path dir, HServerAddress address, Configuration conf)
-    throws IOException {
-    
-    this.closed = true;
+  throws IOException {
     this.fsOk = true;
     this.dir = dir;
     this.conf = conf;
@@ -861,9 +826,7 @@
     LOG.info("Root region dir: " + rootRegionDir.toString());
 
     try {
-
       // Make sure the root directory exists!
-
       if(! fs.exists(dir)) {
         fs.mkdirs(dir);
       }
@@ -887,9 +850,7 @@
           meta.getLog().closeAndDelete();
 
         } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-          }
+          e = RemoteExceptionHandler.checkIOException(e);
           LOG.error("bootstrap", e);
           throw e;
         }
@@ -900,7 +861,7 @@
       throw e;
     }
 
-    this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
     this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
     this.maxRegionOpenTime =
       conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
@@ -908,8 +869,8 @@
     this.msgQueue = new LinkedBlockingQueue<PendingOperation>();
     
     this.serverLeases = new Leases(
-        conf.getLong("hbase.master.lease.period", 30 * 1000), 
-        conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
+        conf.getInt("hbase.master.lease.period", 30 * 1000), 
+        conf.getInt("hbase.master.lease.thread.wakefrequency", 15 * 1000));
     
     this.server = RPC.getServer(this, address.getBindAddress(),
         address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
@@ -923,13 +884,12 @@
     this.connection = HConnectionManager.getConnection(conf);
 
     this.metaRescanInterval =
-      conf.getLong("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
+      conf.getInt("hbase.master.meta.thread.rescanfrequency", 60 * 1000);
 
     // The root region
 
     this.rootScanned = false;
-    this.rootScanner = new RootScanner();
-    this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner");
+    this.rootScannerThread = new RootScanner();
 
     // Scans the meta table
 
@@ -941,8 +901,7 @@
 
     this.initialMetaScanComplete = false;
 
-    this.metaScanner = new MetaScanner();
-    this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
+    this.metaScannerThread = new MetaScanner();
 
     this.unassignedRegions = 
       Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
@@ -973,8 +932,10 @@
     this.loadToServers = new TreeMap<HServerLoad, Set<String>>();
     this.serversToLoad = new HashMap<String, HServerLoad>();
 
+    this.sleeper = new Sleeper(this.threadWakeFrequency, this.closed);
+    
     // We're almost open for business
-    this.closed = false;
+    this.closed.set(false);
     LOG.info("HMaster initialized on " + this.address.toString());
   }
 
@@ -988,7 +949,7 @@
     if (fsOk) {
       if (!FSUtils.isFileSystemAvailable(fs)) {
         LOG.fatal("Shutting down HBase cluster: file system not available");
-        closed = true;
+        closed.set(true);
         fsOk = false;
       }
     }
@@ -1002,108 +963,76 @@
 
   /** Main processing loop */
   public void run() {
-    Thread.currentThread().setName("HMaster");
-    try { 
-      // Start things up
-      this.serverLeases.start();
-      this.rootScannerThread.start();
-      this.metaScannerThread.start();
-
-      // Start the server last so everything else is running before we start
-      // receiving requests
-
-      this.server.start();
-    
-    } catch (IOException e) {
-      if (e instanceof RemoteException) {
-        try {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-
-        } catch (IOException ex) {
-          LOG.warn("thread start", ex);
-        }
-      }
-
-      // Something happened during startup. Shut things down.
-      
-      this.closed = true;
-      LOG.error("Failed startup", e);
-    }
-
+    final String threadName = "HMaster";
+    Thread.currentThread().setName(threadName);
+    startAllServices();
     /*
      * Main processing loop
      */
-     
-    for (PendingOperation op = null; !closed; ) {
-      try {
-        op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
-      
-      } catch (InterruptedException e) {
-        // continue
-      }
-      if (op == null || closed) {
-        continue;
-      }
-      try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Main processing loop: " + op.toString());
+    try {
+      for (PendingOperation op = null; !closed.get(); ) {
+        try {
+          op = msgQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          // continue
         }
-      
-        if (!op.process()) {
-          // Operation would have blocked because not all meta regions are
-          // online. This could cause a deadlock, because this thread is waiting
-          // for the missing meta region(s) to come back online, but since it
-          // is waiting, it cannot process the meta region online operation it
-          // is waiting for. So put this operation back on the queue for now.
-
-          if (msgQueue.size() == 0) {
-            // The queue is currently empty so wait for a while to see if what
-            // we need comes in first
+        if (op == null || closed.get()) {
+          continue;
+        }
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Main processing loop: " + op.toString());
+          }
 
+          if (!op.process()) {
+            // Operation would have blocked because not all meta regions are
+            // online. This could cause a deadlock, because this thread is waiting
+            // for the missing meta region(s) to come back online, but since it
+            // is waiting, it cannot process the meta region online operation it
+            // is waiting for. So put this operation back on the queue for now.
+            if (msgQueue.size() == 0) {
+              // The queue is currently empty so wait for a while to see if what
+              // we need comes in first
+              sleeper.sleep();
+            }
             try {
-              Thread.sleep(threadWakeFrequency);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Put " + op.toString() + " back on queue");
+              }
+              msgQueue.put(op);
             } catch (InterruptedException e) {
-              // continue
+              throw new RuntimeException("Putting into msgQueue was interrupted.", e);
             }
           }
-          try {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Put " + op.toString() + " back on queue");
+        } catch (Exception ex) {
+          if (ex instanceof RemoteException) {
+            try {
+              ex = RemoteExceptionHandler.decodeRemoteException(
+                  (RemoteException)ex);
+            } catch (IOException e) {
+              ex = e;
+              LOG.warn("main processing loop: " + op.toString(), e);
             }
+          }
+          if (!checkFileSystem()) {
+            break;
+          }
+          LOG.warn("Processing pending operations: " + op.toString(), ex);
+          try {
             msgQueue.put(op);
           } catch (InterruptedException e) {
             throw new RuntimeException("Putting into msgQueue was interrupted.", e);
           }
         }
-
-      } catch (Exception ex) {
-        if (ex instanceof RemoteException) {
-          try {
-            ex = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) ex);
-
-          } catch (IOException e) {
-            ex = e;
-            LOG.warn("main processing loop: " + op.toString(), e);
-          }
-        }
-        if (!checkFileSystem()) {
-          break;
-        }
-        LOG.warn("Processing pending operations: " + op.toString(), ex);
-        try {
-          msgQueue.put(op);
-        } catch (InterruptedException e) {
-          throw new RuntimeException("Putting into msgQueue was interrupted.", e);
-        }
       }
+    } catch (Throwable t) {
+      LOG.fatal("Unhandled exception", t);
     }
     letRegionServersShutdown();
 
     /*
      * Clean up and close up shop
      */
-
     synchronized(rootScannerLock) {
       rootScannerThread.interrupt();    // Wake root scanner
     }
@@ -1136,6 +1065,40 @@
 
     LOG.info("HMaster main thread exiting");
   }
+  
+  /*
+   * Start up all services. If any of these threads gets an unhandled exception
+   * then they just die with a logged message.  This should be fine because
+   * in general, we do not expect the master to get such unhandled exceptions
+   *  as OOMEs; it should be lightly loaded. See what HRegionServer does if
+   *  need to install an unexpected exception handler.
+   */
+  private void startAllServices() {
+    String threadName = Thread.currentThread().getName();
+    try {
+      Threads.setDaemonThreadRunning(this.rootScannerThread,
+        threadName + ".rootScanner");
+      Threads.setDaemonThreadRunning(this.metaScannerThread,
+        threadName + ".metaScanner");
+      // Leases are not the same as Chore threads. Set name differently.
+      this.serverLeases.setName(threadName + ".leaseChecker");
+      this.serverLeases.start();
+      // Start the server last so everything else is running before we start
+      // receiving requests.
+      this.server.start();
+    } catch (IOException e) {
+      if (e instanceof RemoteException) {
+        try {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        } catch (IOException ex) {
+          LOG.warn("thread start", ex);
+        }
+      }
+      // Something happened during startup. Shut things down.
+      this.closed.set(true);
+      LOG.error("Failed startup", e);
+    }
+  }
 
   /*
    * Wait on regionservers to report in
@@ -1192,7 +1155,7 @@
       }
       serversToServerInfo.notifyAll();
     }
-    if (storedInfo != null && !closed) {
+    if (storedInfo != null && !closed.get()) {
       try {
         msgQueue.put(new PendingServerShutdown(storedInfo));
       } catch (InterruptedException e) {
@@ -1215,7 +1178,7 @@
       loadToServers.put(load, servers);
     }
 
-    if (!closed) {
+    if (!closed.get()) {
       long serverLabel = getServerLabel(s);
       serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(s));
     }
@@ -1247,7 +1210,7 @@
         // Get all the regions the server was serving reassigned
         // (if we are not shutting down).
 
-        if (!closed) {
+        if (!closed.get()) {
           for (int i = 1; i < msgs.length; i++) {
             HRegionInfo info = msgs[i].getRegionInfo();
 
@@ -1269,7 +1232,7 @@
       return new HMsg[0];
     }
 
-    if (closed) {
+    if (closed.get()) {
       // Tell server to shut down if we are shutting down.  This should
       // happen after check of MSG_REPORT_EXITING above, since region server
       // will send us one of these messages after it gets MSG_REGIONSERVER_STOP
@@ -1361,13 +1324,11 @@
       if (info != null) {
         // Only cancel lease and update load information once.
         // This method can be called a couple of times during shutdown.
-
         LOG.info("Cancelling lease for " + serverName);
         serverLeases.cancelLease(serverLabel, serverLabel);
         leaseCancelled = true;
 
         // update load information
-
         HServerLoad load = serversToLoad.remove(serverName);
         if (load != null) {
           Set<String> servers = loadToServers.get(load);
@@ -1763,17 +1724,11 @@
       try {
         while (true) {
           MapWritable values = null;
-
           try {
             values = server.next(scannerId);
-
           } catch (IOException e) {
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-
-            }
-            LOG.error("Shutdown scanning of meta region", e);
+            LOG.error("Shutdown scanning of meta region",
+              RemoteExceptionHandler.checkIOException(e));
             break;
           }
 
@@ -1805,20 +1760,16 @@
           // Check server name.  If null, be conservative and treat as though
           // region had been on shutdown server (could be null because we
           // missed edits in hlog because hdfs does not do write-append).
-
           String serverName;
           try {
             serverName = Writables.bytesToString(results.get(COL_SERVER));
-
           } catch(UnsupportedEncodingException e) {
             LOG.error("Server name", e);
             break;
           }
           if (serverName.length() > 0 &&
               deadServerName.compareTo(serverName) != 0) {
-          
             // This isn't the server you're looking for - move along
-            
             if (LOG.isDebugEnabled()) {
               LOG.debug("Server name " + serverName + " is not same as " +
                   deadServerName + ": Passing");
@@ -1827,12 +1778,10 @@
           }
 
           // Bingo! Found it.
-          
           HRegionInfo info = null;
           try {
             info = (HRegionInfo) Writables.getWritable(
                 results.get(COL_REGIONINFO), new HRegionInfo());
-            
           } catch (IOException e) {
             LOG.error("Read fields", e);
             break;
@@ -1857,56 +1806,42 @@
               killList.put(deadServerName, regionsToKill);
               unassignedRegions.remove(info.regionName);
               assignAttempts.remove(info.regionName);
-            
               if (regionsToDelete.contains(info.regionName)) {
                 // Delete this region
-
                 regionsToDelete.remove(info.regionName);
                 todo.deleteRegion = true;
-
               } else {
                 // Mark region offline
-
                 todo.regionOffline = true;
               }
             }
             
           } else {
             // Get region reassigned
-
             regions.put(info.regionName, info);
            
             // If it was pending, remove.
             // Otherwise will obstruct its getting reassigned.
-            
             pendingRegions.remove(info.getRegionName());
           }
         }
-
       } finally {
         if(scannerId != -1L) {
           try {
             server.close(scannerId);
-
           } catch (IOException e) {
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-            }
-            LOG.error("Closing scanner", e);
+            LOG.error("Closing scanner",
+              RemoteExceptionHandler.checkIOException(e));
           }
         }
       }
 
       // Remove server from root/meta entries
-      
       for (ToDoEntry e: toDoList) {
         BatchUpdate b = new BatchUpdate(rand.nextLong());
         long lockid = b.startUpdate(e.row);
-      
         if (e.deleteRegion) {
           b.delete(lockid, COL_REGIONINFO);
-        
         } else if (e.regionOffline) {
           e.info.offLine = true;
           b.put(lockid, COL_REGIONINFO, Writables.getBytes(e.info));
@@ -1917,11 +1852,9 @@
       }
 
       // Get regions reassigned
-
       for (Map.Entry<Text, HRegionInfo> e: regions.entrySet()) {
         Text region = e.getKey();
         HRegionInfo regionInfo = e.getValue();
-
         unassignedRegions.put(region, regionInfo);
         assignAttempts.put(region, Long.valueOf(0L));
       }
@@ -1937,20 +1870,17 @@
 
       if (!logSplit) {
         // Process the old log file
-
         StringBuilder dirName = new StringBuilder("log_");
         dirName.append(deadServer.getBindAddress());
         dirName.append("_");
         dirName.append(deadServer.getPort());
         Path logdir = new Path(dir, dirName.toString());
-
         if (fs.exists(logdir)) {
           if (!splitLogLock.tryLock()) {
             return false;
           }
           try {
             HLog.splitLog(dir, logdir, fs, conf);
-
           } finally {
             splitLogLock.unlock();
           }
@@ -1978,7 +1908,7 @@
         HRegionInterface server = null;
         long scannerId = -1L;
         for (int tries = 0; tries < numRetries; tries ++) {
-          if (closed) {
+          if (closed.get()) {
             return true;
           }
           if (rootRegionLocation.get() == null || !rootScanned) {
@@ -2007,11 +1937,7 @@
 
           } catch (IOException e) {
             if (tries == numRetries - 1) {
-              if (e instanceof RemoteException) {
-                e = RemoteExceptionHandler.decodeRemoteException(
-                    (RemoteException) e);
-              }
-              throw e;
+              throw RemoteExceptionHandler.checkIOException(e);
             }
           }
         }
@@ -2025,7 +1951,7 @@
 
       for (int tries = 0; tries < numRetries; tries++) {
         try {
-          if (closed) {
+          if (closed.get()) {
             return true;
           }
           if (!rootScanned ||
@@ -2072,11 +1998,7 @@
 
         } catch (IOException e) {
           if (tries == numRetries - 1) {
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-            }
-            throw e;
+            throw RemoteExceptionHandler.checkIOException(e);
           }
         }
       }
@@ -2123,7 +2045,7 @@
     @Override
     boolean process() throws IOException {
       for (int tries = 0; tries < numRetries; tries++) {
-        if (closed) {
+        if (closed.get()) {
           return true;
         }
         LOG.info("region closed: " + regionInfo.regionName);
@@ -2191,11 +2113,7 @@
 
         } catch (IOException e) {
           if (tries == numRetries - 1) {
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-            }
-            throw e;
+            throw RemoteExceptionHandler.checkIOException(e);
           }
           continue;
         }
@@ -2210,12 +2128,8 @@
       } else if (deleteRegion) {
         try {
           HRegion.deleteRegion(fs, dir, regionInfo.regionName);
-
         } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) e);
-          }
+          e = RemoteExceptionHandler.checkIOException(e);
           LOG.error("failed delete region " + regionInfo.regionName, e);
           throw e;
         }
@@ -2262,7 +2176,7 @@
     @Override
     boolean process() throws IOException {
       for (int tries = 0; tries < numRetries; tries++) {
-        if (closed) {
+        if (closed.get()) {
           return true;
         }
         LOG.info(region.getRegionName() + " open on " + 
@@ -2354,11 +2268,7 @@
 
         } catch (IOException e) {
           if (tries == numRetries - 1) {
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-            }
-            throw e;
+            throw RemoteExceptionHandler.checkIOException(e);
           }
         }
       }
@@ -2372,7 +2282,7 @@
 
   /** {@inheritDoc} */
   public boolean isMasterRunning() {
-    return !closed;
+    return !closed.get();
   }
 
   /** {@inheritDoc} */
@@ -2380,7 +2290,7 @@
     TimerTask tt = new TimerTask() {
       @Override
       public void run() {
-        closed = true;
+        closed.set(true);
         synchronized(msgQueue) {
           msgQueue.clear();                         // Empty the queue
           msgQueue.notifyAll();                     // Wake main thread
@@ -2404,8 +2314,7 @@
       try {
         // We can not access meta regions if they have not already been
         // assigned and scanned.  If we timeout waiting, just shutdown.
-    
-        if (metaScanner.waitForMetaRegionsOrClose()) {
+        if (this.metaScannerThread.waitForMetaRegionsOrClose()) {
           break;
         }
         createTable(newRegion);
@@ -2414,11 +2323,7 @@
       
       } catch (IOException e) {
         if (tries == numRetries - 1) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) e);
-          }
-          throw e;
+          throw RemoteExceptionHandler.checkIOException(e);
         }
       }
     }
@@ -2559,7 +2464,7 @@
       // We can not access any meta region if they have not already been
       // assigned and scanned.
 
-      if (metaScanner.waitForMetaRegionsOrClose()) {
+      if (metaScannerThread.waitForMetaRegionsOrClose()) {
         throw new MasterNotRunningException(); // We're shutting down. Forget it.
       }
 
@@ -2657,12 +2562,8 @@
                 if (scannerId != -1L) {
                   try {
                     server.close(scannerId);
-
                   } catch (IOException e) {
-                    if (e instanceof RemoteException) {
-                      e = RemoteExceptionHandler.decodeRemoteException(
-                          (RemoteException) e);
-                    }
+                    e = RemoteExceptionHandler.checkIOException(e);
                     LOG.error("", e);
                   }
                 }
@@ -2682,14 +2583,8 @@
         } catch (IOException e) {
           if (tries == numRetries - 1) {
             // No retries left
-            
             checkFileSystem();
-
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException(
-                  (RemoteException) e);
-            }
-            throw e;
+            throw RemoteExceptionHandler.checkIOException(e);
           }
           continue;
         }
@@ -2869,11 +2764,8 @@
           HRegion.deleteRegion(fs, dir, i.regionName);
         
         } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) e);
-          }
-          LOG.error("failed to delete region " + i.regionName, e);
+          LOG.error("failed to delete region " + i.regionName,
+            RemoteExceptionHandler.checkIOException(e));
         }
       }
       super.postProcessMeta(m, server);
@@ -2983,13 +2875,10 @@
     /** {@inheritDoc} */
     public void leaseExpired() {
       LOG.info(server + " lease expired");
-
       // Remove the server from the known servers list and update load info
-
       HServerInfo info;
       synchronized (serversToServerInfo) {
         info = serversToServerInfo.remove(server);
-        
         if (info != null) {
           String serverName = info.getServerAddress().toString();
           HServerLoad load = serversToLoad.remove(serverName);
@@ -3015,8 +2904,8 @@
         // continue.  We used to throw a RuntimeException here but on exit
         // this put is often interrupted.  For now, just log these iterrupts
         // rather than throw an exception
-        LOG.warn("MsgQueue.put was interrupted (If we are exiting, this msg " +
-          "can be ignored");
+        LOG.debug("MsgQueue.put was interrupted (If we are exiting, this " +
+          "msg can be ignored)");
       }
     }
   }
@@ -3031,11 +2920,8 @@
     System.exit(0);
   }
 
-  /**
-   * Main program
-   * @param args
-   */
-  public static void main(String [] args) {
+  protected static void doMain(String [] args,
+      Class<? extends HMaster> masterClass) {
     if (args.length < 1) {
       printUsageAndExit();
     }
@@ -3054,7 +2940,10 @@
 
       if (cmd.equals("start")) {
         try {
-          (new Thread(new HMaster(conf))).start();
+          Constructor<? extends HMaster> c =
+            masterClass.getConstructor(Configuration.class);
+          HMaster master = c.newInstance(conf);
+          master.start();
         } catch (Throwable t) {
           LOG.error( "Can not start master", t);
           System.exit(-1);
@@ -3076,5 +2965,13 @@
       // Print out usage if we get to here.
       printUsageAndExit();
     }
+  }
+  
+  /**
+   * Main program
+   * @param args
+   */
+  public static void main(String [] args) {
+    doMain(args, HMaster.class);
   }
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegion.java Sat Sep 15 08:14:53 2007
@@ -834,37 +834,45 @@
     // When execution returns from snapshotMemcacheForLog() with a non-NULL
     // value, the HMemcache will have a snapshot object stored that must be
     // explicitly cleaned up using a call to deleteSnapshot().
+    //
     HMemcache.Snapshot retval = memcache.snapshotMemcacheForLog(log);
     if(retval == null || retval.memcacheSnapshot == null) {
       LOG.debug("Finished memcache flush; empty snapshot");
       return;
     }
-    long logCacheFlushId = retval.sequenceId;
-    if(LOG.isDebugEnabled()) {
-      LOG.debug("Snapshotted memcache for region " +
-        this.regionInfo.regionName + " with sequence id " + retval.sequenceId +
-        " and entries " + retval.memcacheSnapshot.size());
-    }
-
-    // A.  Flush memcache to all the HStores.
-    // Keep running vector of all store files that includes both old and the
-    // just-made new flush store file.
-    for(HStore hstore: stores.values()) {
-      hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
-    }
+    try {
+      long logCacheFlushId = retval.sequenceId;
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("Snapshotted memcache for region " +
+            this.regionInfo.regionName + " with sequence id " +
+            retval.sequenceId + " and entries " +
+            retval.memcacheSnapshot.size());
+      }
 
-    // B.  Write a FLUSHCACHE-COMPLETE message to the log.
-    //     This tells future readers that the HStores were emitted correctly,
-    //     and that all updates to the log for this regionName that have lower 
-    //     log-sequence-ids can be safely ignored.
-    
-    log.completeCacheFlush(this.regionInfo.regionName,
-      regionInfo.tableDesc.getName(), logCacheFlushId);
+      // A.  Flush memcache to all the HStores.
+      // Keep running vector of all store files that includes both old and the
+      // just-made new flush store file.
+      for(HStore hstore: stores.values()) {
+        hstore.flushCache(retval.memcacheSnapshot, retval.sequenceId);
+      }
 
-    // C. Delete the now-irrelevant memcache snapshot; its contents have been 
-    //    dumped to disk-based HStores.
-    memcache.deleteSnapshot();
+      // B.  Write a FLUSHCACHE-COMPLETE message to the log.
+      //     This tells future readers that the HStores were emitted correctly,
+      //     and that all updates to the log for this regionName that have lower 
+      //     log-sequence-ids can be safely ignored.
 
+      log.completeCacheFlush(this.regionInfo.regionName,
+          regionInfo.tableDesc.getName(), logCacheFlushId);
+    } catch (IOException e) {
+      LOG.fatal("Interrupted while flushing. Edits lost. FIX! HADOOP-1903", e);
+      log.abort();
+      throw e;
+    } finally {
+      // C. Delete the now-irrelevant memcache snapshot; its contents have been 
+      //    dumped to disk-based HStores.
+      memcache.deleteSnapshot();
+    }
+    
     // D. Finally notify anyone waiting on memcache to clear:
     // e.g. checkResources().
     synchronized(this) {

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=575928&r1=575927&r2=575928&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Sat Sep 15 08:14:53 2007
@@ -20,11 +20,14 @@
 package org.apache.hadoop.hbase;
 
 import java.io.IOException;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.lang.reflect.Constructor;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Random;
 import java.util.SortedMap;
@@ -33,6 +36,7 @@
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -41,44 +45,43 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.filter.RowFilterInterface;
+import org.apache.hadoop.hbase.io.BatchOperation;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Sleeper;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.hbase.filter.RowFilterInterface;
-import org.apache.hadoop.hbase.io.BatchUpdate;
-import org.apache.hadoop.hbase.io.BatchOperation;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Writables;
-
 /**
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
  */
 public class HRegionServer implements HConstants, HRegionInterface, Runnable {
+  static final Log LOG = LogFactory.getLog(HRegionServer.class);
   
-  /** {@inheritDoc} */
   public long getProtocolVersion(final String protocol, 
       @SuppressWarnings("unused") final long clientVersion)
-    throws IOException {
-    
+  throws IOException {  
     if (protocol.equals(HRegionInterface.class.getName())) {
       return HRegionInterface.versionID;
     }
     throw new IOException("Unknown protocol to name node: " + protocol);
   }
-
-  static final Log LOG = LogFactory.getLog(HRegionServer.class);
   
   // Set when a report to the master comes back with a message asking us to
   // shutdown.  Also set by call to stop when debugging or running unit tests
-  // of HRegionServer in isolation.
-  protected volatile boolean stopRequested;
+  // of HRegionServer in isolation. We use AtomicBoolean rather than
+  // plain boolean so we can pass a reference to Chore threads.  Otherwise,
+  // Chore threads need to know about the hosting class.
+  protected AtomicBoolean stopRequested = new AtomicBoolean(false);
   
   // Go down hard.  Used if file system becomes unavailable and also in
   // debugging and unit tests.
@@ -94,29 +97,55 @@
   
   // region name -> HRegion
   protected final SortedMap<Text, HRegion> onlineRegions;
-  protected final Map<Text, HRegion> retiringRegions = new HashMap<Text, HRegion>();
-  
+  protected final Map<Text, HRegion> retiringRegions =
+    new HashMap<Text, HRegion>();
+ 
   protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
   private final Vector<HMsg> outboundMsgs;
 
   int numRetries;
-  protected final long threadWakeFrequency;
-  private final long msgInterval;
+  protected final int threadWakeFrequency;
+  private final int msgInterval;
+  
+  // File paths
+  private FileSystem fs;
+
+  // Remote HMaster
+  private HMasterRegionInterface hbaseMaster;
+
+  // Server to handle client requests.  Default access so can be accessed by
+  // unit tests.
+  Server server;
+  
+  // Leases
+  private Leases leases;
+  
+  // Request counter
+  private AtomicInteger requestCount;
   
+  // A sleeper that sleeps for msgInterval.
+  private final Sleeper sleeper;
+
   // Check to see if regions should be split
-  protected final long splitOrCompactCheckFrequency;
-  private final SplitOrCompactChecker splitOrCompactChecker;
   private final Thread splitOrCompactCheckerThread;
+  // Needed at shutdown. On way out, if can get this lock then we are not in
+  // middle of a split or compaction: i.e. splits/compactions cannot be
+  // interrupted.
   protected final Integer splitOrCompactLock = new Integer(0);
   
-  /** Runs periodically to determine if regions need to be compacted or split */
-  class SplitOrCompactChecker implements Runnable, RegionUnavailableListener {
+  /**
+   * Runs periodically to determine if regions need to be compacted or split
+   */
+  class SplitOrCompactChecker extends Chore
+  implements RegionUnavailableListener {
     private HTable root = null;
     private HTable meta = null;
-  
-    /**
-     * {@inheritDoc}
-     */
+
+    public SplitOrCompactChecker(final AtomicBoolean stop) {
+      super(conf.getInt("hbase.regionserver.thread.splitcompactcheckfrequency",
+        30 * 1000), stop);
+    }
+
     public void closing(final Text regionName) {
       lock.writeLock().lock();
       try {
@@ -132,9 +161,6 @@
       }
     }
     
-    /**
-     * {@inheritDoc}
-     */
     public void closed(final Text regionName) {
       lock.writeLock().lock();
       try {
@@ -146,87 +172,58 @@
         lock.writeLock().unlock();
       }
     }
-
+    
     /**
-     * {@inheritDoc}
+     * Scan for splits or compactions to run.  Run any we find.
      */
-    public void run() {
-      while (!stopRequested) {
-        long startTime = System.currentTimeMillis();
-        synchronized (splitOrCompactLock) { // Don't interrupt us while we're working
-          // Grab a list of regions to check
-          ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
-          lock.readLock().lock();
-          try {
-            regionsToCheck.addAll(onlineRegions.values());
-          } finally {
-            lock.readLock().unlock();
-          }
-          for(HRegion cur: regionsToCheck) {
-            if(cur.isClosed()) {
-              // Skip if closed
-              continue;
-            }
-            try {
-              if (cur.needsCompaction()) {
-                cur.compactStores();
-              }
-              // After compaction, it probably needs splitting.  May also need
-              // splitting just because one of the memcache flushes was big.
-              Text midKey = new Text();
-              if (cur.needsSplit(midKey)) {
-                split(cur, midKey);
-              }
-            } catch(IOException e) {
-              //TODO: What happens if this fails? Are we toast?
-              LOG.error("Split or compaction failed", e);
-              if (!checkFileSystem()) {
-                break;
-              }
-            }
+    protected void chore() {
+      // Don't interrupt us while we're working
+      synchronized (splitOrCompactLock) {
+        checkForSplitsOrCompactions();
+      }
+    }
+    
+    private void checkForSplitsOrCompactions() {
+      // Grab a list of regions to check
+      List<HRegion> nonClosedRegionsToCheck = getRegionsToCheck();
+      for(HRegion cur: nonClosedRegionsToCheck) {
+        try {
+          if (cur.needsCompaction()) {
+            cur.compactStores();
           }
-        }
-        
-        if (stopRequested) {
-          continue;
-        }
-
-        // Sleep
-        long waitTime = splitOrCompactCheckFrequency -
-          (System.currentTimeMillis() - startTime);
-        if (waitTime > 0) {
-          try {
-            Thread.sleep(waitTime);
-          } catch(InterruptedException iex) {
-            // continue
+          // After compaction, it probably needs splitting.  May also need
+          // splitting just because one of the memcache flushes was big.
+          Text midKey = new Text();
+          if (cur.needsSplit(midKey)) {
+            split(cur, midKey);
+          }
+        } catch(IOException e) {
+          //TODO: What happens if this fails? Are we toast?
+          LOG.error("Split or compaction failed", e);
+          if (!checkFileSystem()) {
+            break;
           }
         }
       }
-      LOG.info("splitOrCompactChecker exiting");
     }
     
     private void split(final HRegion region, final Text midKey)
-      throws IOException {
-      
+    throws IOException {
       final HRegionInfo oldRegionInfo = region.getRegionInfo();
       final HRegion[] newRegions = region.closeAndSplit(midKey, this);
       
       // When a region is split, the META table needs to updated if we're
       // splitting a 'normal' region, and the ROOT table needs to be
       // updated if we are splitting a META region.
-
       HTable t = null;
       if (region.getRegionInfo().tableDesc.getName().equals(META_TABLE_NAME)) {
         // We need to update the root region
-        
-        if (root == null) {
-          root = new HTable(conf, ROOT_TABLE_NAME);
+        if (this.root == null) {
+          this.root = new HTable(conf, ROOT_TABLE_NAME);
         }
         t = root;
-        
       } else {
         // For normal regions we need to update the meta region
-        
         if (meta == null) {
           meta = new HTable(conf, META_TABLE_NAME);
         }
@@ -234,184 +231,114 @@
       }
       LOG.info("Updating " + t.getTableName() + " with region split info");
 
-      // Remove old region from META
+      // Mark old region as offline and split in META.
       // NOTE: there is no need for retry logic here. HTable does it for us.
-      
       long lockid = t.startUpdate(oldRegionInfo.getRegionName());
       oldRegionInfo.offLine = true;
       oldRegionInfo.split = true;
       t.put(lockid, COL_REGIONINFO, Writables.getBytes(oldRegionInfo));
-
       t.put(lockid, COL_SPLITA, Writables.getBytes(
-          newRegions[0].getRegionInfo()));
-
+        newRegions[0].getRegionInfo()));
       t.put(lockid, COL_SPLITB, Writables.getBytes(
-          newRegions[1].getRegionInfo()));
+        newRegions[1].getRegionInfo()));
       t.commit(lockid);
       
       // Add new regions to META
-
       for (int i = 0; i < newRegions.length; i++) {
         lockid = t.startUpdate(newRegions[i].getRegionName());
-
         t.put(lockid, COL_REGIONINFO, Writables.getBytes(
-            newRegions[i].getRegionInfo()));
-        
+          newRegions[i].getRegionInfo()));
         t.commit(lockid);
       }
           
       // Now tell the master about the new regions
-      
       if (LOG.isDebugEnabled()) {
         LOG.debug("Reporting region split to master");
       }
       reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
         newRegions[1].getRegionInfo());
-      
       LOG.info("region split, META update, and report to master all" +
         " successful. Old region=" + oldRegionInfo.getRegionName() +
         ", new regions: " + newRegions[0].getRegionName() + ", " +
         newRegions[1].getRegionName());
       
       // Do not serve the new regions. Let the Master assign them.
-      
     }
   }
 
   // Cache flushing  
-  private final Flusher cacheFlusher;
   private final Thread cacheFlusherThread;
+  // Needed during shutdown so we send an interrupt after completion of a
+  // flush, not in the midst.
   protected final Integer cacheFlusherLock = new Integer(0);
   
   /* Runs periodically to flush memcache.
    */
-  class Flusher implements Runnable {
-    /**
-     * {@inheritDoc}
-     */
-    public void run() {
-      while(! stopRequested) {
-        long startTime = System.currentTimeMillis();
-
-        synchronized(cacheFlusherLock) {
-          // Grab a list of items to flush
-          ArrayList<HRegion> toFlush = new ArrayList<HRegion>();
-          lock.readLock().lock();
-          try {
-            toFlush.addAll(onlineRegions.values());
-          } finally {
-            lock.readLock().unlock();
-          }
-
-          // Flush them, if necessary
-          for(HRegion cur: toFlush) {
-            if(cur.isClosed()) {                // Skip if closed
-              continue;
-            }
-
-            try {
-              cur.optionallyFlush();
-            } catch (IOException iex) {
-              if (iex instanceof RemoteException) {
-                try {
-                  iex = RemoteExceptionHandler.decodeRemoteException((RemoteException) iex);
-                } catch (IOException x) {
-                  iex = x;
-                }
-              }
-              LOG.error("Cache flush failed", iex);
-              if (!checkFileSystem()) {
-                break;
-              }
-            }
-          }
-        }
-        
-        // Sleep
-        long waitTime = stopRequested? 0:
-          threadWakeFrequency - (System.currentTimeMillis() - startTime);
-        if(waitTime > 0) {
-          try {
-            Thread.sleep(waitTime);
-          } catch(InterruptedException iex) {
-            // continue
+  class Flusher extends Chore {
+    public Flusher(final int period, final AtomicBoolean stop) {
+      super(period, stop);
+    }
+    
+    protected void chore() {
+      synchronized(cacheFlusherLock) {
+        checkForFlushesToRun();
+      }
+    }
+    
+    private void checkForFlushesToRun() {
+      // Grab a list of items to flush
+      List<HRegion> nonClosedRegionsToFlush = getRegionsToCheck();
+      // Flush them, if necessary
+      for(HRegion cur: nonClosedRegionsToFlush) {
+        try {
+          cur.optionallyFlush();
+        } catch (IOException iex) {
+          LOG.error("Cache flush failed",
+            RemoteExceptionHandler.checkIOException(iex));
+          if (!checkFileSystem()) {
+            break;
           }
         }
       }
-      LOG.info("cacheFlusher exiting");
     }
   }
   
-  // File paths
-  
-  FileSystem fs;
-  
-  // Logging
-  
+  // HLog and HLog roller.
   protected final HLog log;
-  private final LogRoller logRoller;
   private final Thread logRollerThread;
   protected final Integer logRollerLock = new Integer(0);
   
-  /** Runs periodically to determine if the log should be rolled */
-  class LogRoller implements Runnable {
-    private int maxLogEntries =
+  /** Runs periodically to determine if the HLog should be rolled */
+  class LogRoller extends Chore {
+    private int MAXLOGENTRIES =
       conf.getInt("hbase.regionserver.maxlogentries", 30 * 1000);
     
-    /**
-     * {@inheritDoc}
-     */
-    public void run() {
-      while(!stopRequested) {
-        synchronized(logRollerLock) {
-          // If the number of log entries is high enough, roll the log.  This
-          // is a very fast operation, but should not be done too frequently.
-          int nEntries = log.getNumEntries();
-          if(nEntries > this.maxLogEntries) {
-            try {
-              LOG.info("Rolling hlog. Number of entries: " + nEntries);
-              log.rollWriter();
-            } catch (IOException iex) {
-              if (iex instanceof RemoteException) {
-                try {
-                  iex = RemoteExceptionHandler.
-                    decodeRemoteException((RemoteException) iex);
-                } catch (IOException x) {
-                  iex = x;
-                }
-              }
-              LOG.error("Log rolling failed", iex);
-              if (!checkFileSystem()) {
-                break;
-              }
-            }
-          }
-        }
-        if(!stopRequested) {
-          try {
-            Thread.sleep(threadWakeFrequency);
-          } catch(InterruptedException iex) {
-            // continue
-          }
+    public LogRoller(final int period, final AtomicBoolean stop) {
+      super(period, stop);
+    }
+ 
+    protected void chore() {
+      synchronized(logRollerLock) {
+        checkForLogRoll();
+      }
+    }
+
+    private void checkForLogRoll() {
+      // If the number of log entries is high enough, roll the log.  This
+      // is a very fast operation, but should not be done too frequently.
+      int nEntries = log.getNumEntries();
+      if(nEntries > this.MAXLOGENTRIES) {
+        try {
+          LOG.info("Rolling hlog. Number of entries: " + nEntries);
+          log.rollWriter();
+        } catch (IOException iex) {
+          LOG.error("Log rolling failed",
+            RemoteExceptionHandler.checkIOException(iex));
+          checkFileSystem();
         }
       }
-      LOG.info("logRoller exiting");
     }
   }
-  
-  // Remote HMaster
-
-  private HMasterRegionInterface hbaseMaster;
-
-  // Server
-  
-  private Server server;
-  
-  // Leases
-  private Leases leases;
-  
-  // Request counter
-  private AtomicInteger requestCount;
 
   /**
    * Starts a HRegionServer at the default location
@@ -433,10 +360,8 @@
    * @throws IOException
    */
   public HRegionServer(Path rootDir, HServerAddress address,
-      Configuration conf) throws IOException {
-    
-    // Basic setup
-    this.stopRequested = false;
+      Configuration conf)
+  throws IOException {  
     this.abortRequested = false;
     this.fsOk = true;
     this.rootDir = rootDir;
@@ -450,25 +375,22 @@
 
     // Config'ed params
     this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
-    this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
-    this.msgInterval = conf.getLong("hbase.regionserver.msginterval",
-      3 * 1000);
-    this.splitOrCompactCheckFrequency =
-      conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
-      30 * 1000);
-
-    // Cache flushing
-    this.cacheFlusher = new Flusher();
-    this.cacheFlusherThread = new Thread(cacheFlusher);
-    
-    // Check regions to see if they need to be split
-    this.splitOrCompactChecker = new SplitOrCompactChecker();
-    this.splitOrCompactCheckerThread = new Thread(splitOrCompactChecker);
+    this.threadWakeFrequency = conf.getInt(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000);
+
+    // Cache flushing chore thread.
+    this.cacheFlusherThread =
+      new Flusher(this.threadWakeFrequency, stopRequested);
+    
+    // Check regions to see if they need to be split or compacted chore thread
+    this.splitOrCompactCheckerThread =
+      new SplitOrCompactChecker(this.stopRequested);
     
-    // Process requests from Master
+    // Task thread to process requests from Master
     this.toDo = new LinkedBlockingQueue<ToDoEntry>();
     this.worker = new Worker();
     this.workerThread = new Thread(worker);
+    this.sleeper = new Sleeper(this.msgInterval, this.stopRequested);
 
     try {
       // Server to handle client requests
@@ -499,8 +421,8 @@
       }
       
       this.log = new HLog(fs, logdir, conf);
-      this.logRoller = new LogRoller();
-      this.logRollerThread = new Thread(logRoller);
+      this.logRollerThread =
+        new LogRoller(this.threadWakeFrequency, stopRequested);
 
       // Remote HMaster
       this.hbaseMaster = (HMasterRegionInterface)RPC.waitForProxy(
@@ -508,11 +430,8 @@
           new HServerAddress(conf.get(MASTER_ADDRESS)).getInetSocketAddress(),
           conf);
     } catch (IOException e) {
-      this.stopRequested = true;
-      if (e instanceof RemoteException) {
-        e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-      }
-      throw e;
+      this.stopRequested.set(true);
+      throw RemoteExceptionHandler.checkIOException(e);
     }
   }
 
@@ -527,14 +446,15 @@
    * <p>FOR DEBUGGING ONLY
    */
   synchronized void stop() {
-    stopRequested = true;
+    stopRequested.set(true);
     notifyAll();                        // Wakes run() if it is sleeping
   }
   
   /**
    * Cause the server to exit without closing the regions it is serving, the
    * log it is using and without notifying the master.
-   * <p>FOR DEBUGGING ONLY
+   * Used unit testing and on catastrophic events such as HDFS is yanked out
+   * from under hbase or we OOME.
    */
   synchronized void abort() {
     abortRequested = true;
@@ -574,198 +494,138 @@
     LOG.info("HRegionServer stopped at: " +
       serverInfo.getServerAddress().toString());
   }
-  
+
   /**
    * The HRegionServer sticks in this loop until closed. It repeatedly checks
    * in with the HMaster, sending heartbeats & reports, and receiving HRegion 
    * load/unload instructions.
    */
   public void run() {
+    startAllServices();
     
-    // Threads
-    
-    String threadName = Thread.currentThread().getName();
-
-    workerThread.setName(threadName + ".worker");
-    workerThread.start();
-    cacheFlusherThread.setName(threadName + ".cacheFlusher");
-    cacheFlusherThread.start();
-    splitOrCompactCheckerThread.setName(threadName + ".splitOrCompactChecker");
-    splitOrCompactCheckerThread.start();
-    logRollerThread.setName(threadName + ".logRoller");
-    logRollerThread.start();
-    leases = new Leases(conf.getLong("hbase.regionserver.lease.period", 
-        3 * 60 * 1000), threadWakeFrequency);
-    leases.start();
-    
-    // Server
-    
+    // Set below if HMaster asked us stop.
     boolean masterRequestedStop = false;
+    
     try {
-      this.server.start();
-      LOG.info("HRegionServer started at: " +
-        serverInfo.getServerAddress().toString());
-    } catch(IOException e) {
-      stopRequested = true;
-      if (e instanceof RemoteException) {
+      while(!stopRequested.get()) {
+        long lastMsg = 0;
         try {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-          
-        } catch (IOException ex) {
-          e = ex;
+          reportForDuty();
+        } catch(IOException e) {
+          this.sleeper.sleep(lastMsg);
+          continue;
         }
-      }
-      LOG.error("", e);
-    }
-
-    while(! stopRequested) {
-      long lastMsg = 0;
-      long waitTime;
 
-      // Let the master know we're here
-      try {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Telling master we are up");
-        }
-        requestCount.set(0);
-        serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
-        hbaseMaster.regionServerStartup(serverInfo);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Done telling master we are up");
-        }
-      } catch(IOException e) {
-        waitTime = stopRequested ? 0
-            : msgInterval - (System.currentTimeMillis() - lastMsg);
-        if(waitTime > 0) {
-          synchronized (this) {
-            try {
-              wait(waitTime);
-            } catch (InterruptedException e1) {
-              // Go back up to the while test if stop has been requested.
+        // Now ask master what it wants us to do and tell it what we have done
+        for (int tries = 0; !stopRequested.get();) {
+          if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
+            HMsg outboundArray[] = null;
+            synchronized(outboundMsgs) {
+              outboundArray =
+                this.outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
+              this.outboundMsgs.clear();
             }
-          }
-        }
-        continue;
-      }
-      
-      // Now ask master what it wants us to do and tell it what we have done.
-      while (!stopRequested) {
-        if ((System.currentTimeMillis() - lastMsg) >= msgInterval) {
-          HMsg outboundArray[] = null;
-          synchronized(outboundMsgs) {
-            outboundArray = outboundMsgs.toArray(new HMsg[outboundMsgs.size()]);
-            outboundMsgs.clear();
-          }
-
-          try {
-            serverInfo.setLoad(new HServerLoad(requestCount.get(),
-                onlineRegions.size()));
-            requestCount.set(0);
-            
-            HMsg msgs[] =
-              hbaseMaster.regionServerReport(serverInfo, outboundArray);
-            lastMsg = System.currentTimeMillis();
-            
-            // Queue up the HMaster's instruction stream for processing
-            
-            boolean restart = false;
-            for(int i = 0; i < msgs.length && !stopRequested && !restart; i++) {
-              switch(msgs[i].getMsg()) {
-
-              case HMsg.MSG_CALL_SERVER_STARTUP:
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Got call server startup message");
-                }
-                if (fsOk) {
-                  closeAllRegions();
-                  restart = true;
-                }
-                break;
 
-              case HMsg.MSG_REGIONSERVER_STOP:
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Got regionserver stop message");
-                }
-                masterRequestedStop = true;
-                stopRequested = true;
-                break;
+            try {
+              this.serverInfo.setLoad(new HServerLoad(requestCount.get(),
+                  onlineRegions.size()));
+              this.requestCount.set(0);
+              HMsg msgs[] =
+                this.hbaseMaster.regionServerReport(serverInfo, outboundArray);
+              lastMsg = System.currentTimeMillis();
+              // Queue up the HMaster's instruction stream for processing
+              boolean restart = false;
+              for(int i = 0; i < msgs.length && !stopRequested.get() &&
+                  !restart; i++) {
+                switch(msgs[i].getMsg()) {
+                
+                case HMsg.MSG_CALL_SERVER_STARTUP:
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got call server startup message");
+                  }
+                  // We the MSG_CALL_SERVER_STARTUP on startup but we can also
+                  // get it when the master is panicing because for instance
+                  // the HDFS has been yanked out from under it.  Be wary of
+                  // this message.
+                  if (checkFileSystem()) {
+                    closeAllRegions();
+                    restart = true;
+                  }
+                  
+                  break;
 
-              default:
-                if (fsOk) {
-                  try {
-                    toDo.put(new ToDoEntry(msgs[i]));
-                  } catch (InterruptedException e) {
-                    throw new RuntimeException("Putting into msgQueue was interrupted.", e);
+                case HMsg.MSG_REGIONSERVER_STOP:
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got regionserver stop message");
+                  }
+                  masterRequestedStop = true;
+                  stopRequested.set(true);
+                  break;
+
+                default:
+                  if (fsOk) {
+                    try {
+                      toDo.put(new ToDoEntry(msgs[i]));
+                    } catch (InterruptedException e) {
+                      throw new RuntimeException("Putting into msgQueue was " +
+                        "interrupted.", e);
+                    }
                   }
                 }
               }
-            }
-
-            if(restart || stopRequested) {
-              toDo.clear();
-              break;
-            }
-
-          } catch (IOException e) {
-            if (e instanceof RemoteException) {
-              try {
-                e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-                
-              } catch (IOException ex) {
-                e = ex;
+              if (restart || this.stopRequested.get()) {
+                toDo.clear();
+                break;
+              }
+              // Reset tries count if we had a successful transaction.
+              tries = 0;
+            } catch (IOException e) {
+              e = RemoteExceptionHandler.checkIOException(e);
+              if(tries < this.numRetries) {
+                LOG.warn("", e);
+                tries++;
+              } else {
+                LOG.error("Exceeded max retries: " + this.numRetries, e);
+                if (!checkFileSystem()) {
+                  continue;
+                }
+                // Something seriously wrong. Shutdown.
+                stop();
               }
             }
-            LOG.error("", e);
-          }
-        }
-
-        waitTime = stopRequested ? 0
-            : msgInterval - (System.currentTimeMillis() - lastMsg);
-        if (waitTime > 0) {
-          synchronized (this) {
-            try {
-              wait(waitTime);
-            } catch(InterruptedException iex) {
-              // On interrupt we go around to the while test of stopRequested
-            }
-          }
+          } // while (!stopRequested.get())
+          this.sleeper.sleep(lastMsg);
         }
       }
+    } catch (Throwable t) {
+      LOG.fatal("Unhandled exception. Aborting...", t);
+      abort();
     }
     leases.closeAfterLeasesExpire();
     this.worker.stop();
     this.server.stop();
     
     // Send interrupts to wake up threads if sleeping so they notice shutdown.
-
+    // TODO: Should we check they are alive?  If OOME could have exited already
     synchronized(logRollerLock) {
       this.logRollerThread.interrupt();
     }
-
     synchronized(cacheFlusherLock) {
       this.cacheFlusherThread.interrupt();
     }
-
     synchronized(splitOrCompactLock) {
       this.splitOrCompactCheckerThread.interrupt();
     }
 
     if (abortRequested) {
-      if (fsOk) {
+      if (this.fsOk) {
         // Only try to clean up if the file system is available
-
         try {
           log.close();
           LOG.info("On abort, closed hlog");
         } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            try {
-              e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-            } catch (IOException ex) {
-              e = ex;
-            }
-          }
-          LOG.error("Unable to close log in abort", e);
+          LOG.error("Unable to close log in abort",
+              RemoteExceptionHandler.checkIOException(e));
         }
         closeAllRegions(); // Don't leave any open file handles
       }
@@ -776,18 +636,10 @@
       try {
         log.closeAndDelete();
       } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          try {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-            
-          } catch (IOException ex) {
-            e = ex;
-          }
-        }
-        LOG.error("", e);
+        LOG.error("", RemoteExceptionHandler.checkIOException(e));
       }
       try {
-        if (!masterRequestedStop) {
+        if (!masterRequestedStop && closedRegions != null) {
           HMsg[] exitMsg = new HMsg[closedRegions.size() + 1];
           exitMsg[0] = new HMsg(HMsg.MSG_REPORT_EXITING);
           // Tell the master what regions we are/were serving
@@ -802,15 +654,7 @@
           hbaseMaster.regionServerReport(serverInfo, exitMsg);
         }
       } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          try {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-            
-          } catch (IOException ex) {
-            e = ex;
-          }
-        }
-        LOG.warn("", e);
+        LOG.warn("", RemoteExceptionHandler.checkIOException(e));
       }
       LOG.info("stopping server at: " +
         serverInfo.getServerAddress().toString());
@@ -820,6 +664,73 @@
     LOG.info("main thread exiting");
   }
 
+  /*
+   * Start Chore Threads, Server, Worker and lease checker threads. Install an
+   * UncaughtExceptionHandler that calls abort of RegionServer if we get
+   * an unhandled exception.  We cannot set the handler on all threads.
+   * Server's internal Listener thread is off limits.  For Server, if an OOME,
+   * it waits a while then retries.  Meantime, a flush or a compaction that
+   * tries to run should trigger same critical condition and the shutdown will
+   * run.  On its way out, this server will shut down Server.  Leases are sort
+   * of inbetween. It has an internal thread that while it inherits from
+   * Chore, it keeps its own internal stop mechanism so needs to be stopped
+   * by this hosting server.
+   */
+  private void startAllServices() {
+    String n = Thread.currentThread().getName();
+    UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
+      public void uncaughtException(Thread t, Throwable e) {
+        abort();
+        LOG.fatal("Set stop flag in " + t.getName(), e);
+      }
+    };
+    Threads.setDaemonThreadRunning(this.cacheFlusherThread, n + ".cacheFlusher",
+      handler);
+    Threads.setDaemonThreadRunning(this.splitOrCompactCheckerThread,
+      n + ".splitOrCompactChecker", handler);
+    Threads.setDaemonThreadRunning(this.logRollerThread, n + ".logRoller",
+      handler);
+    // Worker is not the same as the above threads in that it does not
+    // inherit from Chore.  Set an UncaughtExceptionHandler on it in case its
+    // the one to see an OOME, etc., first.  The handler will set the stop
+    // flag.
+    Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
+    // Leases is not a Thread. Internally it runs a daemon thread.  If it gets
+    // an unhandled exception, it will just exit.
+    this.leases = new Leases(
+      conf.getInt("hbase.regionserver.lease.period", 3 * 60 * 1000),
+      this.threadWakeFrequency);
+    this.leases.setName(n + ".leaseChecker");
+    this.leases.start();
+    // Start Server.  This service is like leases in that it internally runs
+    // a thread.
+    try {
+      this.server.start();
+      LOG.info("HRegionServer started at: " +
+        serverInfo.getServerAddress().toString());
+    } catch(IOException e) {
+      this.stopRequested.set(true);
+      LOG.fatal("Failed start Server",
+        RemoteExceptionHandler.checkIOException(e));
+    }
+  }
+  
+  /*
+   * Let the master know we're here
+   * @throws IOException
+   */
+  private void reportForDuty() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Telling master we are up");
+    }
+    this.requestCount.set(0);
+    this.serverInfo.setLoad(new HServerLoad(0, onlineRegions.size()));
+    this.hbaseMaster.regionServerStartup(serverInfo);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Done telling master we are up");
+    }
+  }
+
   /** Add to the outbound message buffer */
   private void reportOpen(HRegion region) {
     synchronized(outboundMsgs) {
@@ -877,17 +788,15 @@
       }
     }
     
-    /**
-     * {@inheritDoc}
-     */
     public void run() {
-      for(ToDoEntry e = null; !stopRequested; ) {
+      try {
+      for(ToDoEntry e = null; !stopRequested.get(); ) {
         try {
           e = toDo.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
         } catch (InterruptedException ex) {
           // continue
         }
-        if(e == null || stopRequested) {
+        if(e == null || stopRequested.get()) {
           continue;
         }
         try {
@@ -895,15 +804,18 @@
           
           switch(e.msg.getMsg()) {
 
-          case HMsg.MSG_REGION_OPEN:                    // Open a region
+          case HMsg.MSG_REGION_OPEN:
+            // Open a region
             openRegion(e.msg.getRegionInfo());
             break;
 
-          case HMsg.MSG_REGION_CLOSE:                   // Close a region
+          case HMsg.MSG_REGION_CLOSE:
+            // Close a region
             closeRegion(e.msg.getRegionInfo(), true);
             break;
 
-          case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:    // Close a region, don't reply
+          case HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT:
+            // Close a region, don't reply
             closeRegion(e.msg.getRegionInfo(), false);
             break;
 
@@ -913,14 +825,7 @@
                 + e.msg.toString());
           }
         } catch (IOException ie) {
-          if (ie instanceof RemoteException) {
-            try {
-              ie = RemoteExceptionHandler.decodeRemoteException((RemoteException) ie);
-              
-            } catch (IOException x) {
-              ie = x;
-            }
-          }
+          ie = RemoteExceptionHandler.checkIOException(ie);
           if(e.tries < numRetries) {
             LOG.warn(ie);
             e.tries++;
@@ -937,7 +842,11 @@
           }
         }
       }
-      LOG.info("worker thread exiting");
+      } catch(Throwable t) {
+        LOG.fatal("Unhandled exception", t);
+      } finally {
+        LOG.info("worker thread exiting");
+      }
     }
   }
   
@@ -991,15 +900,8 @@
       try {
         region.close(abortRequested);
       } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          try {
-            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-            
-          } catch (IOException x) {
-            e = x;
-          }
-        }
-        LOG.error("error closing region " + region.getRegionName(), e);
+        LOG.error("error closing region " + region.getRegionName(),
+          RemoteExceptionHandler.checkIOException(e));
       }
     }
     return regionsToClose;
@@ -1188,14 +1090,7 @@
       leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
       return scannerId;
     } catch (IOException e) {
-      if (e instanceof RemoteException) {
-        try {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        } catch (IOException x) {
-          e = x;
-        }
-      }
-      LOG.error("", e);
+      LOG.error("", RemoteExceptionHandler.checkIOException(e));
       checkFileSystem();
       throw e;
     }
@@ -1344,16 +1239,39 @@
    * 
    * @return false if file system is not available
    */
-  protected boolean checkFileSystem() {
-    if (fsOk) {
+  protected synchronized boolean checkFileSystem() {
+    if (this.fsOk) {
       if (!FSUtils.isFileSystemAvailable(fs)) {
         LOG.fatal("Shutting down HRegionServer: file system not available");
-        abortRequested = true;
-        stopRequested = true;
+        this.abortRequested = true;
+        this.stopRequested.set(true);
         fsOk = false;
       }
     }
-    return fsOk;
+    return this.fsOk;
+  }
+ 
+  /**
+   * @return Returns list of non-closed regions hosted on this server.  If no
+   * regions to check, returns an empty list.
+   */
+  protected List<HRegion> getRegionsToCheck() {
+    ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
+    lock.readLock().lock();
+    try {
+      regionsToCheck.addAll(this.onlineRegions.values());
+    } finally {
+      lock.readLock().unlock();
+    }
+    // Purge closed regions.
+    for (final ListIterator<HRegion> i = regionsToCheck.listIterator();
+        i.hasNext();) {
+      HRegion r = i.next();
+      if (r.isClosed()) {
+        i.remove();
+      }
+    }
+    return regionsToCheck;
   }
 
   //
@@ -1374,13 +1292,15 @@
   }
   
   /**
+   * Do class main.
    * @param args
+   * @param regionServerClass HRegionServer to instantiate.
    */
-  public static void main(String [] args) {
+  protected static void doMain(final String [] args,
+      final Class<? extends HRegionServer> regionServerClass) {
     if (args.length < 1) {
       printUsageAndExit();
     }
-    
     Configuration conf = new HBaseConfiguration();
     
     // Process command-line args. TODO: Better cmd-line processing
@@ -1394,7 +1314,13 @@
       
       if (cmd.equals("start")) {
         try {
-          (new Thread(new HRegionServer(conf))).start();
+          
+          Constructor<? extends HRegionServer> c =
+            regionServerClass.getConstructor(Configuration.class);
+          HRegionServer hrs = c.newInstance(conf);
+          Thread t = new Thread(hrs);
+          t.setName("regionserver" + hrs.server.getListenerAddress());
+          t.start();
         } catch (Throwable t) {
           LOG.error( "Can not start region server because "+
               StringUtils.stringifyException(t) );
@@ -1412,4 +1338,11 @@
       printUsageAndExit();
     }
   }
-}
+  
+  /**
+   * @param args
+   */
+  public static void main(String [] args) {
+    doMain(args, HRegionServer.class);
+  }
+}
\ No newline at end of file



Mime
View raw message