hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r535970 [2/4] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/bin/ src/contrib/hbase/conf/ src/contrib/hbase/src/java/org/apache/hadoop/hbase/ src/contrib/hbase/src/test/org/apache/hadoop/hbase/
Date Mon, 07 May 2007 19:58:57 GMT
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=535970&r1=535969&r2=535970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Mon May  7 12:58:53 2007
@@ -23,34 +23,38 @@
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.ipc.*;
+import org.apache.hadoop.util.StringUtils;
 
 import java.io.*;
 import java.util.*;
 
-/*******************************************************************************
+/**
  * HMaster is the "master server" for a HBase.
  * There is only one HMaster for a single HBase deployment.
- ******************************************************************************/
-public class HMaster implements HConstants, HMasterInterface, HMasterRegionInterface {
+ */
+public class HMaster implements HConstants, HMasterInterface, 
+    HMasterRegionInterface, Runnable {
 
-  public long getProtocolVersion(String protocol, 
-      long clientVersion) throws IOException { 
+  public long getProtocolVersion(String protocol, long clientVersion)
+  throws IOException { 
     if (protocol.equals(HMasterInterface.class.getName())) {
       return HMasterInterface.versionID; 
-    } else if (protocol.equals(HMasterRegionInterface.class.getName())){
+    } else if (protocol.equals(HMasterRegionInterface.class.getName())) {
       return HMasterRegionInterface.versionID;
     } else {
       throw new IOException("Unknown protocol to name node: " + protocol);
     }
   }
 
-  private final Log LOG = LogFactory.getLog(this.getClass().getName());
+  private static final Log LOG =
+    LogFactory.getLog(org.apache.hadoop.hbase.HMaster.class.getName());
   
-  private boolean closed;
+  private volatile boolean closed;
   private Path dir;
   private Configuration conf;
   private FileSystem fs;
   private Random rand;
+  private long threadWakeFrequency; 
   private long maxRegionOpenTime;
   
   // The 'msgQueue' is used to assign work to the client processor thread
@@ -67,170 +71,227 @@
   
   private HServerAddress rootRegionLocation;
   
-  //////////////////////////////////////////////////////////////////////////////
-  // The ROOT/META scans, performed each time a meta region comes on-line
-  // Since both have identical structure, we only need one class to get the work
-  // done, however we have to create separate objects for each.
-  //////////////////////////////////////////////////////////////////////////////
+  /**
+   * Columns in the 'meta' ROOT and META tables.
+   */
+  private static final Text METACOLUMNS[] = {
+      COLUMN_FAMILY
+  };
+  
+  static final String MASTER_NOT_RUNNING = "Master not running";
 
   private boolean rootScanned;
   private int numMetaRegions;
-  
+
   /**
-   * How do we know if all regions are assigned?
+   * Base HRegion scanner class. Holds utilty common to <code>ROOT</code> and
+   * <code>META</code> HRegion scanners.
    * 
-   * 1. After the initial scan of the root and meta regions, all regions known
-   *    at that time will have been or are in the process of being assigned.
+   * <p>How do we know if all regions are assigned? After the initial scan of
+   * the <code>ROOT</code> and <code>META</code> regions, all regions known at
+   * that time will have been or are in the process of being assigned.</p>
    * 
-   * 2. When a region is split the region server notifies the master of the split
-   *    and the new regions are assigned. But suppose the master loses the split
-   *    message? We need to periodically rescan the root and meta regions.
-   *    
-   *    - If we rescan, any regions that are new but not assigned will have no
-   *      server info. Any regions that are not being served by the same server
-   *      will get re-assigned.
+   * <p>When a region is split the region server notifies the master of the
+   * split and the new regions are assigned. But suppose the master loses the
+   * split message? We need to periodically rescan the <code>ROOT</code> and
+   * <code>META</code> regions.
+   *    <ul>
+   *    <li>If we rescan, any regions that are new but not assigned will have
+   *    no server info. Any regions that are not being served by the same
+   *    server will get re-assigned.</li>
    *      
-   *      - Thus a periodic rescan of the root region will find any new meta
-   *        regions where we missed the meta split message or we failed to detect
-   *        a server death and consequently need to assign the region to a new
-   *        server.
-   *        
-   *      - if we keep track of all the known meta regions, then we can rescan
-   *        them periodically. If we do this then we can detect an regions for
-   *        which we missed a region split message.
+   *    <li>Thus a periodic rescan of the root region will find any new
+   *    <code>META</code> regions where we missed the <code>META</code> split
+   *    message or we failed to detect a server death and consequently need to
+   *    assign the region to a new server.</li>
    *        
-   * Thus just keeping track of all the meta regions permits periodic rescanning
-   * which will detect unassigned regions (new or otherwise) without the need to
-   * keep track of every region.
+   *    <li>if we keep track of all the known <code>META</code> regions, then
+   *    we can rescan them periodically. If we do this then we can detect any
+   *    regions for which we missed a region split message.</li>
+   *    </ul>
+   *    
+   * Thus just keeping track of all the <code>META</code> regions permits
+   * periodic rescanning which will detect unassigned regions (new or
+   * otherwise) without the need to keep track of every region.</p>
    * 
-   * So the root region scanner needs to wake up
-   * 1. when the master receives notification that the root region has been opened.
-   * 2. periodically after the first scan
+   * <p>So the <code>ROOT</code> region scanner needs to wake up:
+   * <ol>
+   * <li>when the master receives notification that the <code>ROOT</code>
+   * region has been opened.</li>
+   * <li>periodically after the first scan</li>
+   * </ol>
    * 
-   * The meta scanner needs to wake up:
-   * 1. when a meta region comes on line
-   * 2. periodically to rescan the known meta regions
+   * The <code>META</code>  scanner needs to wake up:
+   * <ol>
+   * <li>when a <code>META</code> region comes on line</li>
+   * </li>periodically to rescan the known <code>META</code> regions</li>
+   * </ol>
    * 
-   * A meta region is not 'known' until it has been scanned once.
-   *        
+   * <p>A <code>META</code> region is not 'known' until it has been scanned
+   * once.
    */
-  private class RootScanner implements Runnable {
-    private final Text cols[] = {
-      ROOT_COLUMN_FAMILY
-    };
-    private final Text firstRow = new Text();
-    private HRegionInterface rootServer;
+  private abstract class BaseScanner implements Runnable {
+    private final Text FIRST_ROW = new Text();
     
-    public RootScanner() {
-      rootServer = null;
-    }
-    
-    public void run() {
-      while((!closed)) {
-        rootScanned = false;
-        waitForRootRegion();
+    protected boolean scanRegion(final MetaRegion region)
+    throws IOException {
+      boolean scannedRegion = false;
+      HRegionInterface server = null;
+      long scannerId = -1L;
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("scanning meta region " + region.regionName);
+      }
 
-        rootServer = null;
-        long scannerId = -1L;
-        
-        try {
-          rootServer = client.getHRegionConnection(rootRegionLocation);
-          scannerId = rootServer.openScanner(HGlobals.rootRegionInfo.regionName, cols, firstRow);
-          
-        } catch(IOException iex) {
-          try {
-            iex.printStackTrace();
-            if(scannerId != -1L) {
-              rootServer.close(scannerId);
-            }
-            
-          } catch(IOException iex2) {
-          }
-          closed = true;
-          break;
-        }
-        try {
-          LOG.debug("starting root region scan");
+      try {
+        server = client.getHRegionConnection(region.server);
+        scannerId = server.openScanner(region.regionName, METACOLUMNS, FIRST_ROW);
 
-          DataInputBuffer inbuf = new DataInputBuffer();
-          while(true) {
-            TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-            HStoreKey key = new HStoreKey();
-            LabelledData[] values = rootServer.next(scannerId, key);
-            if(values.length == 0) {
-              break;
-            }
-            for(int i = 0; i < values.length; i++) {
-              results.put(values[i].getLabel(), values[i].getData().get());
-            }
-            byte[] bytes = results.get(ROOT_COL_REGIONINFO);
-            if(bytes == null || bytes.length == 0) {
-              LOG.fatal("no value for " + ROOT_COL_REGIONINFO);
-              stop();
-            }
-            inbuf.reset(bytes, bytes.length);
-            HRegionInfo info = new HRegionInfo();
-            info.readFields(inbuf);
+        DataInputBuffer inbuf = new DataInputBuffer();
+        while (true) {
+          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+          HStoreKey key = new HStoreKey();
 
-            String serverName = null;
-            bytes = results.get(ROOT_COL_SERVER);
-            if(bytes != null && bytes.length != 0) {
-              serverName = new String(bytes, UTF8_ENCODING);
-            }
-            
-            long startCode = -1L;
-            bytes = results.get(ROOT_COL_STARTCODE);
-            if(bytes != null && bytes.length != 0) {
-              startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
-            }
-    
-            // Note META region to load.
-    
-            HServerInfo storedInfo = null;
-            if(serverName != null) {
-              serversToServerInfo.get(serverName);
-            }
-            if(storedInfo == null
-                || storedInfo.getStartCode() != startCode) {
+          LabelledData[] values = server.next(scannerId, key);
 
-              // The current assignment is no good; load the region.
+          if (values.length == 0) {
+            break;
+          }
 
-              unassignedRegions.put(info.regionName, info);
-              assignAttempts.put(info.regionName, 0L);
+          for (int i = 0; i < values.length; i++) {
+            results.put(values[i].getLabel(), values[i].getData().get());
+          }
 
-              LOG.debug("region unassigned: " + info.regionName);
-            }
+          HRegionInfo info = getRegionInfo(COL_REGIONINFO, results, inbuf);
+          String serverName = getServerName(COL_SERVER, results);
+          long startCode = getStartCode(COL_STARTCODE, results);
+          
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("row: " + info.toString() + ", server: " + serverName
+                + ", startCode: " + startCode);
+          }
 
+          // Note Region has been assigned.
+          checkAssigned(info, serverName, startCode);
+
+          scannedRegion = true;
+        }
+      } finally {
+        try {
+        if (scannerId != -1L) {
+          server.close(scannerId);
+        }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+        scannerId = -1L;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("scan of meta region " + region.regionName + " complete");
+      }
+      return scannedRegion;
+    }
+    
+    protected HRegionInfo getRegionInfo(final Text key,
+        final TreeMap<Text, byte[]> data, final DataInputBuffer in)
+    throws IOException {
+      byte[] bytes = data.get(key);
+      if (bytes == null || bytes.length == 0) {
+        throw new IOException("no value for " + key);
+      }
+      in.reset(bytes, bytes.length);
+      HRegionInfo info = new HRegionInfo();
+      info.readFields(in);
+      return info;
+    }
+    
+    protected String getServerName(final Text key,
+        final TreeMap<Text, byte[]> data)
+    throws UnsupportedEncodingException {
+      byte [] bytes = data.get(key);
+      String name = (bytes != null && bytes.length != 0)?
+        new String(bytes, UTF8_ENCODING): null;
+      return (name != null)? name.trim(): name;
+    }
+
+    protected long getStartCode(final Text key,
+        final TreeMap<Text, byte[]> data)
+    throws NumberFormatException, UnsupportedEncodingException {
+      long startCode = -1L;
+      byte [] bytes = data.get(key);
+      if(bytes != null && bytes.length != 0) {
+        startCode = Long.valueOf(new String(bytes, UTF8_ENCODING).trim());
+      }
+      return startCode;
+    }
+    
+    protected void checkAssigned(final HRegionInfo info,
+        final String serverName, final long startCode) {
+      HServerInfo storedInfo = null;
+      if(serverName != null) {
+        storedInfo = serversToServerInfo.get(serverName);
+      }
+      if(storedInfo == null || storedInfo.getStartCode() != startCode) {
+        // The current assignment is no good; load the region.
+        unassignedRegions.put(info.regionName, info);
+        assignAttempts.put(info.regionName, 0L);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("region unassigned: " + info.regionName);
+        }
+      }
+    }
+  }
+  
+  /**
+   * Scanner for the <code>ROOT</code> HRegion.
+   */
+  private class RootScanner extends BaseScanner {
+    public void run() {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Running ROOT scanner");
+      }
+      try {
+        while(!closed) {
+          // rootRegionLocation will be filled in when we get an 'open region'
+          // regionServerReport message from the HRegionServer that has been
+          // allocated the ROOT region below.  If we get back false, then
+          // HMaster has closed.
+          if (waitForRootRegionOrClose()) {
+            continue;
+          }
+          rootScanned = false;
+          // Make a MetaRegion instance for ROOT region to pass scanRegion.
+          MetaRegion mr = new MetaRegion();
+          mr.regionName = HGlobals.rootRegionInfo.regionName;
+          mr.server = HMaster.this.rootRegionLocation;
+          mr.startKey = null;
+          if (scanRegion(mr)) {
             numMetaRegions += 1;
           }
-
-        } catch(Exception iex) {
-          iex.printStackTrace();
-          
-        } finally {
+          rootScanned = true;
           try {
-            if(scannerId != -1L) {
-              rootServer.close(scannerId);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("RootScanner going to sleep");
             }
-            
-          } catch(IOException iex2) {
+            Thread.sleep(metaRescanInterval);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("RootScanner woke up");
+            }
+          } catch(InterruptedException e) {
+            // Catch and go around again. If interrupt, its spurious or we're
+            // being shutdown.  Go back up to the while test.
           }
-          scannerId = -1L;
         }
+      } catch(IOException e) {
+        e.printStackTrace();
+        closed = true;
       }
-      rootScanned = true;
-      try {
-        Thread.sleep(metaRescanInterval);
-          
-      } catch(InterruptedException e) {
-      }
+      LOG.debug("ROOT scanner exiting");
     }
   }
   
   private RootScanner rootScanner;
   private Thread rootScannerThread;
   
-  /** Contains information the meta scanner needs to process a "new" meta region */
   private class MetaRegion {
     public HServerAddress server;
     public Text regionName;
@@ -245,165 +306,89 @@
   private boolean allMetaRegionsScanned;
   
   /**
-   * MetaScanner scans a region either in the META table.
+   * MetaScanner <code>META</code> table.
    * 
-   * When a meta server comes on line, a MetaRegion object is queued up by
-   * regionServerReport() and this thread wakes up.
+   * When a <code>META</code> server comes on line, a MetaRegion object is
+   * queued up by regionServerReport() and this thread wakes up.
    *
    * It's important to do this work in a separate thread, or else the blocking 
    * action would prevent other work from getting done.
    */
-  private class MetaScanner implements Runnable {
-    private final Text cols[] = {
-        META_COLUMN_FAMILY
-    };
-    private final Text firstRow = new Text();
-    
-    public MetaScanner() {
-    }
-
-    private void scanRegion(MetaRegion region) {
-      HRegionInterface server = null;
-      long scannerId = -1L;
-      
-      LOG.debug("scanning meta region: " + region.regionName);
-      
-      try {
-        server = client.getHRegionConnection(region.server);
-        scannerId = server.openScanner(region.regionName, cols, firstRow);
-        
-      } catch(IOException iex) {
-        try {
-          if(scannerId != -1L) {
-            server.close(scannerId);
-            scannerId = -1L;
-          }
-          stop();
-          
-        } catch(IOException iex2) {
-        }
-        return;
-      }
-
-      DataInputBuffer inbuf = new DataInputBuffer();
-      try {
-        while(true) {
-          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-          HStoreKey key = new HStoreKey();
-
-          LabelledData[] values = server.next(scannerId, key);
-
-          if(values.length == 0) {
-            break;
-          }
-
-          for(int i = 0; i < values.length; i++) {
-            results.put(values[i].getLabel(), values[i].getData().get());
-          }
-
-          byte bytes[] = results.get(META_COL_REGIONINFO);
-          if(bytes == null || bytes.length == 0) {
-            LOG.fatal("no value for " + META_COL_REGIONINFO);
-            stop();
-          }
-          inbuf.reset(bytes, bytes.length);
-          HRegionInfo info = new HRegionInfo();
-          info.readFields(inbuf);
-
-          String serverName = null;
-          bytes = results.get(META_COL_SERVER);
-          if(bytes != null && bytes.length != 0) {
-            serverName = new String(bytes, UTF8_ENCODING);
-          }
-          
-          long startCode = -1L;
-          bytes = results.get(META_COL_STARTCODE);
-          if(bytes != null && bytes.length != 0) {
-            startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
-          }
-
-          // Note HRegion to load.
-
-          HServerInfo storedInfo = null;
-          if(serverName != null) {
-            serversToServerInfo.get(serverName);
-          }
-          if(storedInfo == null
-              || storedInfo.getStartCode() != startCode) {
-
-            // The current assignment is no good; load the region.
-
-            unassignedRegions.put(info.regionName, info);
-            assignAttempts.put(info.regionName, 0L);
-            LOG.debug("region unassigned: " + info.regionName);
-          }
-        }
-
-      } catch(Exception iex) {
-        iex.printStackTrace();
-        
-      } finally {
-        try {
-          if(scannerId != -1L) {
-            server.close(scannerId);
-          }
-          
-        } catch(IOException iex2) {
-          iex2.printStackTrace();
-        }
-        scannerId = -1L;
-      }
-    }
-
+  private class MetaScanner extends BaseScanner {
     public void run() {
-      while((!closed)) {
+      while (!closed) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Running META scanner");
+        }
         MetaRegion region = null;
-        
-        while(region == null) {
-          synchronized(metaRegionsToScan) {
-            if(metaRegionsToScan.size() != 0) {
+        while (region == null && !closed) {
+          synchronized (metaRegionsToScan) {
+            if (metaRegionsToScan.size() != 0) {
               region = metaRegionsToScan.remove(0);
             }
-            if(region == null) {
+            if (region == null) {
               try {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("MetaScanner going into wait");
+                }
                 metaRegionsToScan.wait();
-
-              } catch(InterruptedException e) {
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("MetaScanner woke up");
+                }
+              } catch (InterruptedException e) {
+                // Catch and go around again.  We've been woken because there
+                // are new meta regions available or because we are being
+                // shut down.
               }
             }
           }
         }
-     
-        scanRegion(region);
-        if(closed) {
-          break;
-        }
-        knownMetaRegions.put(region.startKey, region);
-        if(rootScanned && knownMetaRegions.size() == numMetaRegions) {
-          LOG.debug("all meta regions scanned");
-          allMetaRegionsScanned = true;
-          metaRegionsScanned();
+        if (closed) {
+          continue;
         }
-
-        do {
-          try {
-            Thread.sleep(metaRescanInterval);
-          
-          } catch(InterruptedException ex) {
-          }
-          if(!allMetaRegionsScanned) {
-            break;                              // A meta region must have split
-          }
-          
-          // Rescan the known meta regions every so often
-          
-          Vector<MetaRegion> v = new Vector<MetaRegion>();
-          v.addAll(knownMetaRegions.values());
-          
-          for(Iterator<MetaRegion> i = v.iterator(); i.hasNext(); ) {
-            scanRegion(i.next());
+        try {
+          scanRegion(region);
+          knownMetaRegions.put(region.startKey, region);
+          if (rootScanned && knownMetaRegions.size() == numMetaRegions) {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("all meta regions scanned");
+            }
+            allMetaRegionsScanned = true;
+            metaRegionsScanned();
           }
-        } while(true);
+
+          do {
+            try {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sleep for meta rescan interval");
+              }
+              Thread.sleep(metaRescanInterval);
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sleep for meta rescan interval");
+              }
+            } catch(InterruptedException ex) {
+              // Catch and go around again.
+            }
+            if(!allMetaRegionsScanned         // A meta region must have split
+                || closed) {                  // We're shutting down
+              break;
+            }
+
+            // Rescan the known meta regions every so often
+            Vector<MetaRegion> v = new Vector<MetaRegion>();
+            v.addAll(knownMetaRegions.values());
+            for(Iterator<MetaRegion> i = v.iterator(); i.hasNext(); ) {
+              scanRegion(i.next());
+            }
+          } while(true);
+
+        } catch(IOException e) {
+          e.printStackTrace();
+          closed = true;
+        }
+      }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("META scanner exiting");
       }
     }
 
@@ -412,11 +397,19 @@
     }
     
     public synchronized void waitForMetaScan() {
-      while(!allMetaRegionsScanned) {
+      while(!closed && !allMetaRegionsScanned) {
         try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Wait for all meta regions scanned");
+          }
           wait();
-          
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Wake from wait for all meta regions scanned");
+          }
         } catch(InterruptedException e) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Wake from wait for all meta regions scanned (IE)");
+          }
         }
       }
     }
@@ -425,11 +418,6 @@
   private MetaScanner metaScanner;
   private Thread metaScannerThread;
   
-  // Client processing
-  
-  private ClientProcessor clientProcessor;
-  private Thread clientProcessorThread;
-
   // The 'unassignedRegions' table maps from a region name to a HRegionInfo record,
   // which includes the region's table, its id, and its start/end keys.
   //
@@ -449,9 +437,8 @@
 
   private SortedMap<String, TreeMap<Text, HRegionInfo>> killList;
 
-  // 'serversToServerInfo' maps from the String to its HServerInfo
-
-  private SortedMap<String, HServerInfo> serversToServerInfo;
+  private SortedMap<String, HServerInfo> serversToServerInfo =
+    Collections.synchronizedSortedMap(new TreeMap<String, HServerInfo>());
 
   /** Build the HMaster out of a raw configuration item. */
   public HMaster(Configuration conf) throws IOException {
@@ -468,7 +455,8 @@
    * 
    * @throws IOException
    */
-  public HMaster(Path dir, HServerAddress address, Configuration conf) throws IOException {
+  public HMaster(Path dir, HServerAddress address, Configuration conf)
+  throws IOException {
     this.closed = true;
     this.dir = dir;
     this.conf = conf;
@@ -481,12 +469,10 @@
       fs.mkdirs(dir);
     }
 
-    Path rootRegionDir = HStoreFile.getHRegionDir(dir, HGlobals.rootRegionInfo.regionName);
+    Path rootRegionDir =
+      HStoreFile.getHRegionDir(dir, HGlobals.rootRegionInfo.regionName);
     if(! fs.exists(rootRegionDir)) {
-      LOG.debug("bootstrap: creating root and meta regions");
-      
-      // Bootstrap! Need to create the root region and the first meta region.
-
+      LOG.info("bootstrap: creating ROOT and first META regions");
       try {
         HRegion root = createNewHRegion(HGlobals.rootTableDesc, 0L);
         HRegion meta = createNewHRegion(HGlobals.metaTableDesc, 1L);
@@ -501,13 +487,16 @@
       }
     }
 
+    this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
     this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
     this.msgQueue = new Vector<PendingOperation>();
-    this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 30 * 1000), 
+    this.serverLeases = new Leases(
+        conf.getLong("hbase.master.lease.period", 30 * 1000), 
         conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
     
     this.server = RPC.getServer(this, address.getBindAddress(),
-        address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+        address.getPort(), conf.getInt("hbase.regionserver.handler.count", 10),
+        false, conf);
 
     //  The rpc-server port can be ephemeral... ensure we have the correct info
     
@@ -539,11 +528,6 @@
     this.metaScanner = new MetaScanner();
     this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
 
-    // Process updates to meta asychronously
-    
-    this.clientProcessor = new ClientProcessor();
-    this.clientProcessorThread = new Thread(clientProcessor, "HMaster.clientProcessor");
-    
     this.unassignedRegions = 
       Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
     
@@ -558,75 +542,118 @@
       Collections.synchronizedSortedMap(
           new TreeMap<String, TreeMap<Text, HRegionInfo>>());
     
-    this.serversToServerInfo = 
-      Collections.synchronizedSortedMap(new TreeMap<String, HServerInfo>());
-    
     // We're almost open for business
     
     this.closed = false;
     
+    LOG.info("HMaster initialized on " + address.toString());
+  }
+  
+  public boolean isMasterRunning() {
+    return !closed;
+  }
+
+  public void run() {
+    Thread.currentThread().setName("HMaster");
     try { 
       // Start things up
-    
       this.rootScannerThread.start();
       this.metaScannerThread.start();
-      this.clientProcessorThread.start();
 
       // Start the server last so everything else is running before we start
       // receiving requests
-    
       this.server.start();
-      
     } catch(IOException e) {
-      // Something happend during startup. Shut things down.
-      
+      // Something happened during startup. Shut things down.
       this.closed = true;
-      throw e;
+      e.printStackTrace();
     }
-    LOG.info("HMaster started");
-  }
-
-  /** Turn off the HMaster.  Turn off all the threads, close files, etc. */
-  public void stop() throws IOException {
-    closed = true;
 
-    try {
-      client.close();
-      
-    } catch(IOException iex) {
+    // Main processing loop
+    for(PendingOperation op = null; !closed; ) {
+      synchronized(msgQueue) {
+        while(msgQueue.size() == 0 && serversToServerInfo.size() != 0) {
+          try {
+            msgQueue.wait(threadWakeFrequency);
+          } catch(InterruptedException iex) {
+          }
+        }
+        if(msgQueue.size() == 0 || closed) {
+          continue;
+        }
+        op = msgQueue.remove(msgQueue.size()-1);
+      }
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Processing " + op.toString());
+        }
+        op.process();
+      } catch(Exception ex) {
+        msgQueue.insertElementAt(op, 0);
+      }
     }
-    server.stop();
-    LOG.info("shutting down HMaster");
-  }
-  
-  /** returns the HMaster server address */
-  public HServerAddress getMasterAddress() {
-    return address;
-  }
 
-  /** Call this to wait for everything to finish */
-  public void join() {
+    /*
+     * Clean up and close up shop
+     */
+
+    // Wake other threads so they notice the close
+    
+    rootScannerThread.interrupt();
+    metaScannerThread.interrupt();
+    server.stop();                              // Stop server
+    serverLeases.close();                       // Turn off the lease monitor
     try {
-      server.join();
-      
-    } catch(InterruptedException iex) {
+      fs.close();
+      client.close();                           // Shut down the client
+    } catch(IOException iex) {
+      // Print if ever there is an interrupt (Just for kicks. Remove if it
+      // ever happens).
+      iex.printStackTrace();
     }
+    
+    // Join up with all threads
+    
     try {
-      clientProcessorThread.join();
-      
+      // Wait for the root scanner to finish.
+      rootScannerThread.join();
     } catch(Exception iex) {
+      // Print if ever there is an interrupt (Just for kicks. Remove if it
+      // ever happens).
+      iex.printStackTrace();
     }
     try {
+      // Join the thread till it finishes.
       metaScannerThread.join();
-      
     } catch(Exception iex) {
+      // Print if ever there is an interrupt (Just for kicks. Remove if it
+      // ever happens).
+      iex.printStackTrace();
     }
     try {
-      rootScannerThread.join();
-      
-    } catch(Exception iex) {
+      // Join until its finished.  TODO: Maybe do in parallel in its own thread
+      // as is done in TaskTracker if its taking a long time to go down.
+      server.join();
+    } catch(InterruptedException iex) {
+      // Print if ever there is an interrupt (Just for kicks. Remove if it
+      // ever happens).
+      iex.printStackTrace();
+    }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("HMaster main thread exiting");
+    }
+  }
+  
+  /** 
+   * Turn off the HMaster.  Sets a flag so that the main thread know to shut
+   * things down in an orderly fashion.
+   */
+  public void shutdown() throws IOException {
+    closed = true;
+    synchronized(msgQueue) {
+      msgQueue.clear();                         // Empty the queue
+      msgQueue.notifyAll();                     // Wake main thread
     }
-    LOG.info("HMaster stopped");
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -635,20 +662,19 @@
   
   /** HRegionServers call this method upon startup. */
   public void regionServerStartup(HServerInfo serverInfo) throws IOException {
-    String server = serverInfo.getServerAddress().toString();
+    String server = serverInfo.getServerAddress().toString().trim();
     HServerInfo storedInfo = null;
 
-    LOG.debug("received start message from: " + server);
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("received start message from: " + server);
+    }
     
     // If we get the startup message but there's an old server by that
     // name, then we can timeout the old one right away and register
     // the new one.
-    
-    storedInfo = serversToServerInfo.get(server);
-
-    if(storedInfo != null) {
-      serversToServerInfo.remove(server);
+    storedInfo = serversToServerInfo.remove(server);
 
+    if(storedInfo != null && !closed) {
       synchronized(msgQueue) {
         msgQueue.add(new PendingServerShutdown(storedInfo));
         msgQueue.notifyAll();
@@ -659,19 +685,33 @@
 
     serversToServerInfo.put(server, serverInfo);
 
-    Text serverLabel = new Text(server);        
-    serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server));
+    if(!closed) {
+      Text serverLabel = new Text(server);        
+      serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server));
+    }
   }
 
   /** HRegionServers call this method repeatedly. */
   public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException {
-    String server = serverInfo.getServerAddress().toString();
+    String server = serverInfo.getServerAddress().toString().trim();
+
+    if(closed) {
+      // We're shutting down. Tell the server to go away.
+      serversToServerInfo.remove(server);
+
+      HMsg returnMsgs[] = {
+          new HMsg(HMsg.MSG_REGIONSERVER_STOP)
+      };
+      return returnMsgs;
+    }
 
     HServerInfo storedInfo = serversToServerInfo.get(server);
 
     if(storedInfo == null) {
 
-      LOG.debug("received server report from unknown server: " + server);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("received server report from unknown server: " + server);
+      }
 
       // The HBaseMaster may have been restarted.
       // Tell the RegionServer to start over and call regionServerStartup()
@@ -691,10 +731,13 @@
       //
       // The answer is to ask A to shut down for good.
 
-      LOG.debug("region server race condition detected: " + server);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("region server race condition detected: " + server);
+      }
 
-      HMsg returnMsgs[] = new HMsg[1];
-      returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING);
+      HMsg returnMsgs[] = {
+          new HMsg(HMsg.MSG_REGIONSERVER_STOP)
+      };
       return returnMsgs;
 
     } else {
@@ -720,7 +763,9 @@
     
     // Process the kill list
     
-    TreeMap<Text, HRegionInfo> regionsToKill = killList.get(info.toString());
+    TreeMap<Text, HRegionInfo> regionsToKill =
+      killList.remove(info.getServerAddress().toString());
+    
     if(regionsToKill != null) {
       for(Iterator<HRegionInfo> i = regionsToKill.values().iterator();
           i.hasNext(); ) {
@@ -741,8 +786,10 @@
 
         if(regionInfo == null) {
 
-          LOG.debug("region server " + info.getServerAddress().toString()
-              + "should not have opened region " + region.regionName);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("region server " + info.getServerAddress().toString()
+                + "should not have opened region " + region.regionName);
+          }
 
           // This Region should not have been opened.
           // Ask the server to shut it down, but don't report it as closed.  
@@ -753,8 +800,10 @@
 
         } else {
 
-          LOG.debug(info.getServerAddress().toString() + " serving "
-              + region.regionName);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug(info.getServerAddress().toString() + " serving "
+                + region.regionName);
+          }
 
           // Remove from unassigned list so we don't assign it to someone else
 
@@ -797,8 +846,10 @@
         break;
 
       case HMsg.MSG_REPORT_CLOSE:
-        LOG.debug(info.getServerAddress().toString() + " no longer serving "
-            + region.regionName);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(info.getServerAddress().toString() + " no longer serving "
+              + region.regionName);
+        }
 
         if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region
           rootRegionLocation = null;
@@ -835,7 +886,9 @@
         break;
 
       case HMsg.MSG_NEW_REGION:
-        LOG.debug("new region " + region.regionName);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("new region " + region.regionName);
+        }
 
         if(region.regionName.find(META_TABLE_NAME.toString()) == 0) {
           // A meta region has split.
@@ -865,15 +918,17 @@
       long now = System.currentTimeMillis();
 
       for(Iterator<Text> it = unassignedRegions.keySet().iterator();
-      it.hasNext(); ) {
+          it.hasNext(); ) {
 
         Text curRegionName = it.next();
         HRegionInfo regionInfo = unassignedRegions.get(curRegionName);
         long assignedTime = assignAttempts.get(curRegionName);
 
         if(now - assignedTime > maxRegionOpenTime) {
-          LOG.debug("assigning region " + regionInfo.regionName + " to server "
-              + info.getServerAddress().toString());
+          if(LOG.isDebugEnabled()) {
+            LOG.debug("assigning region " + regionInfo.regionName + " to server "
+                + info.getServerAddress().toString());
+          }
 
           returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
 
@@ -893,51 +948,55 @@
     notifyAll();
   }
   
-  private synchronized void waitForRootRegion() {
-    while(rootRegionLocation == null) {
+  /**
+   * Wait until <code>rootRegionLocation</code> has been set or until the
+   * <code>closed</code> flag has been set.
+   * @return True if <code>rootRegionLocation</code> was populated.
+   */
+  private synchronized boolean waitForRootRegionOrClose() {
+    while (!closed && rootRegionLocation == null) {
       try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Wait for root region (or close)");
+        }
         wait();
-        
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Wake from wait for root region (or close)");
+        }
       } catch(InterruptedException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Wake from wait for root region (or close) (IE)");
+        }
       }
     }
+    return this.rootRegionLocation == null;
   }
-
-  //////////////////////////////////////////////////////////////////////////////
-  // Some internal classes to manage msg-passing and client operations
-  //////////////////////////////////////////////////////////////////////////////
   
-  private class ClientProcessor implements Runnable {
-    public ClientProcessor() {
-    }
-    
-    public void run() {
-      while(!closed) {
-        PendingOperation op = null;
-        
-        synchronized(msgQueue) {
-          while(msgQueue.size() == 0) {
-            try {
-              msgQueue.wait();
-              
-            } catch(InterruptedException iex) {
-            }
-          }
-          op = msgQueue.remove(msgQueue.size()-1);
+  private synchronized void waitForRootRegion() {
+    while (rootRegionLocation == null) {
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Wait for root region");
         }
-        try {
-          op.process();
-          
-        } catch(Exception ex) {
-          msgQueue.insertElementAt(op, 0);
+        wait();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Wake from wait for root region");
+        }
+      } catch(InterruptedException e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Wake from wait for root region (IE)");
         }
       }
     }
   }
 
+  //////////////////////////////////////////////////////////////////////////////
+  // Some internal classes to manage msg-passing and client operations
+  //////////////////////////////////////////////////////////////////////////////
+  
   private abstract class PendingOperation {
     protected final Text[] columns = {
-        META_COLUMN_FAMILY
+        COLUMN_FAMILY
     };
     protected final Text startRow = new Text();
     protected long clientId;
@@ -981,16 +1040,25 @@
             results.put(values[i].getLabel(), values[i].getData().get());
           }
           
-          String serverName = 
-            new String(results.get(META_COL_SERVER), UTF8_ENCODING);
+          byte[] bytes = results.get(COL_SERVER); 
+          String serverName = null;
+          if(bytes == null || bytes.length == 0) {
+            // No server
+            continue;
+          }
+          serverName = new String(bytes, UTF8_ENCODING);
 
           if(deadServer.compareTo(serverName) != 0) {
             // This isn't the server you're looking for - move along
             continue;
           }
 
-          long startCode = 
-            Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING));
+          bytes = results.get(COL_STARTCODE);
+          if(bytes == null || bytes.length == 0) {
+            // No start code
+            continue;
+          }
+          long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
 
           if(oldStartCode != startCode) {
             // Close but no cigar
@@ -999,12 +1067,17 @@
 
           // Bingo! Found it.
 
-          byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO);
-          inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+          bytes = results.get(COL_REGIONINFO);
+          if(bytes == null || bytes.length == 0) {
+            throw new IOException("no value for " + COL_REGIONINFO);
+          }
+          inbuf.reset(bytes, bytes.length);
           HRegionInfo info = new HRegionInfo();
           info.readFields(inbuf);
 
-          LOG.debug(serverName + " was serving " + info.regionName);
+          if(LOG.isDebugEnabled()) {
+            LOG.debug(serverName + " was serving " + info.regionName);
+          }
           
           // Add to our to do lists
 
@@ -1029,8 +1102,8 @@
 
       for(int i = 0; i < toDoList.size(); i++) {
         long lockid = server.startUpdate(regionName, clientId, toDoList.get(i).getRow());
-        server.delete(regionName, clientId, lockid, META_COL_SERVER);
-        server.delete(regionName, clientId, lockid, META_COL_STARTCODE);
+        server.delete(regionName, clientId, lockid, COL_SERVER);
+        server.delete(regionName, clientId, lockid, COL_STARTCODE);
         server.commit(regionName, clientId, lockid);
       }
 
@@ -1049,7 +1122,9 @@
     }
     
     public void process() throws IOException {
-      LOG.debug("server shutdown: " + deadServer);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("server shutdown: " + deadServer);
+      }
 
       // Scan the ROOT region
       
@@ -1107,30 +1182,40 @@
 
       metaScanner.waitForMetaScan();
       
-      LOG.debug("region closed: " + regionInfo.regionName);
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("region closed: " + regionInfo.regionName);
+      }
 
       // Mark the Region as unavailable in the appropriate meta table
 
       Text metaRegionName;
       HRegionInterface server;
-      if(rootRegion) {
+      if (rootRegion) {
         metaRegionName = HGlobals.rootRegionInfo.regionName;
         waitForRootRegion();            // Make sure root region available
         server = client.getHRegionConnection(rootRegionLocation);
         
       } else {
-        Text metaStartRow = knownMetaRegions.headMap(regionInfo.regionName).lastKey();
-        MetaRegion r = knownMetaRegions.get(metaStartRow);
+        MetaRegion r = null;
+        if(knownMetaRegions.containsKey(regionInfo.regionName)) {
+          r = knownMetaRegions.get(regionInfo.regionName);
+          
+        } else {
+          r = knownMetaRegions.get(
+              knownMetaRegions.headMap(regionInfo.regionName).lastKey());
+        }
         metaRegionName = r.regionName;
         server = client.getHRegionConnection(r.server);
       }
       long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName);
-      server.delete(metaRegionName, clientId, lockid, META_COL_SERVER);
-      server.delete(metaRegionName, clientId, lockid, META_COL_STARTCODE);
+      server.delete(metaRegionName, clientId, lockid, COL_SERVER);
+      server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
       server.commit(metaRegionName, clientId, lockid);
       
       if(reassignRegion) {
-        LOG.debug("reassign region: " + regionInfo.regionName);
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("reassign region: " + regionInfo.regionName);
+        }
         
         unassignedRegions.put(regionInfo.regionName, regionInfo);
         assignAttempts.put(regionInfo.regionName, 0L);
@@ -1169,6 +1254,7 @@
             String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
         
       } catch(UnsupportedEncodingException e) {
+        e.printStackTrace();
       }
 
     }
@@ -1180,7 +1266,10 @@
 
       metaScanner.waitForMetaScan();
       
-      LOG.debug(regionName + " open on " + serverAddress.toString());
+      if(LOG.isDebugEnabled()) {
+        LOG.debug(regionName + " open on "
+            + new String(serverAddress.get(), UTF8_ENCODING));
+      }
 
       // Register the newly-available Region's location.
 
@@ -1192,14 +1281,23 @@
         server = client.getHRegionConnection(rootRegionLocation);
         
       } else {
-        Text metaStartRow = knownMetaRegions.headMap(regionName).lastKey();
-        MetaRegion r = knownMetaRegions.get(metaStartRow);
+        MetaRegion r = null;
+        if(knownMetaRegions.containsKey(regionName)) {
+          r = knownMetaRegions.get(regionName);
+          
+        } else {
+          r = knownMetaRegions.get(
+              knownMetaRegions.headMap(regionName).lastKey());
+        }
         metaRegionName = r.regionName;
         server = client.getHRegionConnection(r.server);
       }
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("updating row " + regionName + " in table " + metaRegionName);
+      }
       long lockid = server.startUpdate(metaRegionName, clientId, regionName);
-      server.put(metaRegionName, clientId, lockid, META_COL_SERVER, serverAddress);
-      server.put(metaRegionName, clientId, lockid, META_COL_STARTCODE, startCode);
+      server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress);
+      server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
       server.commit(metaRegionName, clientId, lockid);
     }
   }
@@ -1208,7 +1306,15 @@
   // HMasterInterface
   //////////////////////////////////////////////////////////////////////////////
   
+  /** returns the HMaster server address */
+  public HServerAddress getMasterAddress() {
+    return address;
+  }
+
   public void createTable(HTableDescriptor desc) throws IOException {
+    if (!isMasterRunning()) {
+      throw new IllegalStateException(MASTER_NOT_RUNNING);
+    }
     HRegionInfo newRegion = new HRegionInfo(rand.nextLong(), desc, null, null);
     
     // We can not access any meta region if they have not already been assigned
@@ -1218,13 +1324,19 @@
     
     // 1. Check to see if table already exists
 
-    Text metaStartRow = knownMetaRegions.headMap(newRegion.regionName).lastKey();
-    MetaRegion m = knownMetaRegions.get(metaStartRow);
+    MetaRegion m = null;
+    if(knownMetaRegions.containsKey(newRegion.regionName)) {
+      m = knownMetaRegions.get(newRegion.regionName);
+      
+    } else {
+      m = knownMetaRegions.get(
+          knownMetaRegions.headMap(newRegion.regionName).lastKey());
+    }
     Text metaRegionName = m.regionName;
     HRegionInterface server = client.getHRegionConnection(m.server);
 
 
-    BytesWritable bytes = server.get(metaRegionName, desc.getName(), META_COL_REGIONINFO);
+    BytesWritable bytes = server.get(metaRegionName, desc.getName(), COL_REGIONINFO);
     if(bytes != null && bytes.getSize() != 0) {
       byte[] infoBytes = bytes.get();
       DataInputBuffer inbuf = new DataInputBuffer();
@@ -1250,7 +1362,7 @@
 
     long clientId = rand.nextLong();
     long lockid = server.startUpdate(metaRegionName, clientId, regionName);
-    server.put(metaRegionName, clientId, lockid, META_COL_REGIONINFO, 
+    server.put(metaRegionName, clientId, lockid, COL_REGIONINFO, 
         new BytesWritable(byteValue.toByteArray()));
     server.commit(metaRegionName, clientId, lockid);
     
@@ -1263,7 +1375,9 @@
     unassignedRegions.put(regionName, info);
     assignAttempts.put(regionName, 0L);
     
-    LOG.debug("created table " + desc.getName());
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("created table " + desc.getName());
+    }
   }
 
   /**
@@ -1283,8 +1397,9 @@
     Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
     fs.mkdirs(regionDir);
 
-    return new HRegion(dir, new HLog(fs, new Path(regionDir, "log"), conf), fs,
-        conf, info, null, null);
+    return new HRegion(dir,
+      new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf),
+      fs, conf, info, null, null);
   }
   
   /**
@@ -1307,48 +1422,65 @@
 
     table.getRegionInfo().write(s);
     
-    meta.put(writeid, META_COL_REGIONINFO, bytes.toByteArray());
+    meta.put(writeid, COL_REGIONINFO, new BytesWritable(bytes.toByteArray()));
     
     meta.commit(writeid);
   }
   
   public void deleteTable(Text tableName) throws IOException {
-    Text[] columns = {
-        META_COLUMN_FAMILY
-    };
+    if (!isMasterRunning()) {
+      throw new IllegalStateException(MASTER_NOT_RUNNING);
+    }
     
     // We can not access any meta region if they have not already been assigned
     // and scanned.
 
     metaScanner.waitForMetaScan();
     
-    for(Iterator<MetaRegion> it = knownMetaRegions.tailMap(tableName).values().iterator();
+    Text firstMetaRegion = null;
+    if(knownMetaRegions.size() == 1) {
+      firstMetaRegion = knownMetaRegions.firstKey();
+      
+    } else if(knownMetaRegions.containsKey(tableName)) {
+      firstMetaRegion = tableName;
+      
+    } else {
+      firstMetaRegion = knownMetaRegions.headMap(tableName).lastKey();
+    }
+    for(Iterator<MetaRegion> it =
+      knownMetaRegions.tailMap(firstMetaRegion).values().iterator();
         it.hasNext(); ) {
 
       // Find all the regions that make up this table
       
-      long clientId = rand.nextLong();
       MetaRegion m = it.next();
       HRegionInterface server = client.getHRegionConnection(m.server);
+      Vector<Text> rowsToDelete = new Vector<Text>();
+
       long scannerId = -1L;
       try {
-        scannerId = server.openScanner(m.regionName, columns, tableName);
+        scannerId = server.openScanner(m.regionName, METACOLUMNS, tableName);
         
-        Vector<Text> rowsToDelete = new Vector<Text>();
         
         DataInputBuffer inbuf = new DataInputBuffer();
+        byte[] bytes;
         while(true) {
           LabelledData[] values = null;
           HStoreKey key = new HStoreKey();
           values = server.next(scannerId, key);
-          if(values.length == 0) {
+          if(values == null || values.length == 0) {
             break;
           }
           TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
           for(int i = 0; i < values.length; i++) {
-            results.put(values[i].getLabel(), values[i].getData().get());
+            bytes = new byte[values[i].getData().getSize()];
+            System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
+            results.put(values[i].getLabel(), bytes);
+          }
+          bytes = results.get(COL_REGIONINFO);
+          if(bytes == null || bytes.length == 0) {
+            break;
           }
-          byte bytes[] = results.get(META_COL_REGIONINFO);
           inbuf.reset(bytes, bytes.length);
           HRegionInfo info = new HRegionInfo();
           info.readFields(inbuf);
@@ -1356,33 +1488,37 @@
           if(info.tableDesc.getName().compareTo(tableName) > 0) {
             break;                      // Beyond any more entries for this table
           }
+          
+          rowsToDelete.add(info.regionName);
 
           // Is it being served?
           
-          String serverName =
-            new String(results.get(META_COL_SERVER), UTF8_ENCODING);
-
-          long startCode = 
-            Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING));
+          bytes = results.get(COL_SERVER);
+          if(bytes != null && bytes.length != 0) {
+            String serverName = new String(bytes, UTF8_ENCODING);
+            
+            bytes = results.get(COL_STARTCODE);
+            if(bytes != null && bytes.length != 0) {
+              long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
 
-          HServerInfo s = serversToServerInfo.get(serverName);
-          if(s != null && s.getStartCode() == startCode) {
+              HServerInfo s = serversToServerInfo.get(serverName);
+              if(s != null && s.getStartCode() == startCode) {
 
-            // It is being served. Tell the server to stop it and not report back
+                // It is being served.
+                // Tell the server to stop it and not report back.
 
-            TreeMap<Text, HRegionInfo> regionsToKill = killList.get(serverName);
-            if(regionsToKill == null) {
-              regionsToKill = new TreeMap<Text, HRegionInfo>();
+                TreeMap<Text, HRegionInfo> regionsToKill =
+                  killList.get(serverName);
+                
+                if(regionsToKill == null) {
+                  regionsToKill = new TreeMap<Text, HRegionInfo>();
+                }
+                regionsToKill.put(info.regionName, info);
+                killList.put(serverName, regionsToKill);
+              }
             }
-            regionsToKill.put(info.regionName, info);
-            killList.put(serverName, regionsToKill);
           }
         }
-        for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
-          long lockid = server.startUpdate(m.regionName, clientId, row.next());
-          server.delete(m.regionName, clientId, lockid, columns[0]);
-          server.commit(m.regionName, clientId, lockid);
-        }
       } catch(IOException e) {
         e.printStackTrace();
         
@@ -1398,8 +1534,27 @@
         }
         scannerId = -1L;
       }
+      for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
+        Text rowName = row.next();
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("deleting columns in row: " + rowName);
+        }
+        try {
+          long clientId = rand.nextLong();
+          long lockid = server.startUpdate(m.regionName, clientId, rowName);
+          server.delete(m.regionName, clientId, lockid, COL_REGIONINFO);
+          server.delete(m.regionName, clientId, lockid, COL_SERVER);
+          server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
+          server.commit(m.regionName, clientId, lockid);
+          
+        } catch(Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("deleted table: " + tableName);
     }
-    LOG.debug("deleted table: " + tableName);
   }
   
   public HServerAddress findRootRegion() {
@@ -1418,7 +1573,9 @@
     }
     
     public void leaseExpired() {
-      LOG.debug(server + " lease expired");
+      if(LOG.isDebugEnabled()) {
+        LOG.debug(server + " lease expired");
+      }
       
       HServerInfo storedInfo = serversToServerInfo.remove(server);
       synchronized(msgQueue) {
@@ -1428,30 +1585,53 @@
     }
   }
 
-  private static void printUsage() {
+  private static void printUsageAndExit() {
     System.err.println("Usage: java org.apache.hbase.HMaster " +
-        "[--bind=hostname:port]");
+        "[--bind=hostname:port] start|stop");
+    System.exit(0);
   }
   
   public static void main(String [] args) throws IOException {
+    if (args.length < 1) {
+      printUsageAndExit();
+    }
+    
     Configuration conf = new HBaseConfiguration();
     
     // Process command-line args. TODO: Better cmd-line processing
     // (but hopefully something not as painful as cli options).
+    final String addressArgKey = "--bind=";
     for (String cmd: args) {
-      if (cmd.equals("-h") || cmd.startsWith("--h")) {
-        printUsage();
-        return;
+      if (cmd.startsWith(addressArgKey)) {
+        conf.set(MASTER_ADDRESS, cmd.substring(addressArgKey.length()));
+        continue;
       }
       
-      final String addressArgKey = "--bind=";
-      if (cmd.startsWith(addressArgKey)) {
-        conf.set(MASTER_ADDRESS,
-            cmd.substring(addressArgKey.length()));
+      if (cmd.equals("start")) {
+        try {
+          (new Thread(new HMaster(conf))).start();
+        } catch (Throwable t) {
+          LOG.error( "Can not start master because "+
+              StringUtils.stringifyException(t) );
+          System.exit(-1);
+        }
+        break;
       }
+      
+      if (cmd.equals("stop")) {
+        try {
+          HClient client = new HClient(conf);
+          client.shutdown();
+        } catch (Throwable t) {
+          LOG.error( "Can not stop master because " +
+              StringUtils.stringifyException(t) );
+          System.exit(-1);
+        }
+        break;
+      }
+      
+      // Print out usage if we get to here.
+      printUsageAndExit();
     }
-    
-    new HMaster(conf);
   }
-}
-
+}
\ No newline at end of file

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java?view=diff&rev=535970&r1=535969&r2=535970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java Mon May  7 12:58:53 2007
@@ -20,10 +20,11 @@
 
 import java.io.IOException;
 
-/*******************************************************************************
- * Clients interact with the HMasterInterface to gain access to meta-level HBase
- * functionality, like finding an HRegionServer and creating/destroying tables.
- ******************************************************************************/
+/**
+ * Clients interact with the HMasterInterface to gain access to meta-level
+ * HBase functionality, like finding an HRegionServer and creating/destroying
+ * tables.
+ */
 public interface HMasterInterface extends VersionedProtocol {
   public static final long versionID = 1L; // initial version
 
@@ -33,6 +34,11 @@
 
   public void createTable(HTableDescriptor desc) throws IOException;
   public void deleteTable(Text tableName) throws IOException;
+  
+  /**
+   * Shutdown an HBase cluster.
+   */
+  public void shutdown() throws IOException;
 
   //////////////////////////////////////////////////////////////////////////////
   // These are the method calls of last resort when trying to find an HRegion

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=535970&r1=535969&r2=535970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Mon May  7 12:58:53 2007
@@ -75,11 +75,15 @@
         throw new IOException("Snapshot in progress!");
       }
       if(memcache.size() == 0) {
-        LOG.debug("memcache empty. Skipping snapshot");
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("memcache empty. Skipping snapshot");
+        }
         return retval;
       }
 
-      LOG.debug("starting memcache snapshot");
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("starting memcache snapshot");
+      }
       
       retval.memcacheSnapshot = memcache;
       this.snapshot = memcache;
@@ -87,7 +91,9 @@
       memcache = new TreeMap<HStoreKey, BytesWritable>();
       retval.sequenceId = log.startCacheFlush();
       
-      LOG.debug("memcache snapshot complete");
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("memcache snapshot complete");
+      }
       
       return retval;
       
@@ -108,7 +114,9 @@
       if(snapshot == null) {
         throw new IOException("Snapshot not present!");
       }
-      LOG.debug("deleting snapshot");
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("deleting snapshot");
+      }
       
       for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator(); 
           it.hasNext(); ) {
@@ -121,7 +129,9 @@
       }
       this.snapshot = null;
       
-      LOG.debug("snapshot deleted");
+      if(LOG.isDebugEnabled()) {
+        LOG.debug("snapshot deleted");
+      }
       
     } finally {
       this.locker.writeLock().unlock();
@@ -133,15 +143,15 @@
    *
    * Operation uses a write lock.
    */
-  public void add(Text row, TreeMap<Text, byte[]> columns, long timestamp) {
+  public void add(Text row, TreeMap<Text, BytesWritable> columns, long timestamp) {
     this.locker.writeLock().lock();
     try {
       for(Iterator<Text> it = columns.keySet().iterator(); it.hasNext(); ) {
         Text column = it.next();
-        byte[] val = columns.get(column);
+        BytesWritable val = columns.get(column);
 
         HStoreKey key = new HStoreKey(row, column, timestamp);
-        memcache.put(key, new BytesWritable(val));
+        memcache.put(key, val);
       }
       
     } finally {
@@ -154,11 +164,11 @@
    *
    * We only need a readlock here.
    */
-  public byte[][] get(HStoreKey key, int numVersions) {
-    Vector<byte[]> results = new Vector<byte[]>();
+  public BytesWritable[] get(HStoreKey key, int numVersions) {
+    Vector<BytesWritable> results = new Vector<BytesWritable>();
     this.locker.readLock().lock();
     try {
-      Vector<byte[]> result = get(memcache, key, numVersions-results.size());
+      Vector<BytesWritable> result = get(memcache, key, numVersions-results.size());
       results.addAll(0, result);
 
       for(int i = history.size()-1; i >= 0; i--) {
@@ -174,22 +184,22 @@
         return null;
         
       } else {
-        return (byte[][]) results.toArray(new byte[results.size()][]);
+        return results.toArray(new BytesWritable[results.size()]);
       }
       
     } finally {
       this.locker.readLock().unlock();
     }
   }
-
+  
   /**
    * Return all the available columns for the given key.  The key indicates a 
    * row and timestamp, but not a column name.
    *
    * The returned object should map column names to byte arrays (byte[]).
    */
-  public TreeMap<Text, byte[]> getFull(HStoreKey key) throws IOException {
-    TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+  public TreeMap<Text, BytesWritable> getFull(HStoreKey key) throws IOException {
+    TreeMap<Text, BytesWritable> results = new TreeMap<Text, BytesWritable>();
     this.locker.readLock().lock();
     try {
       internalGetFull(memcache, key, results);
@@ -205,7 +215,7 @@
   }
   
   void internalGetFull(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key, 
-      TreeMap<Text, byte[]> results) {
+      TreeMap<Text, BytesWritable> results) {
     
     SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(key);
     
@@ -216,7 +226,7 @@
       if(results.get(itCol) == null
           && key.matchesWithoutColumn(itKey)) {
         BytesWritable val = tailMap.get(itKey);
-        results.put(itCol, val.get());
+        results.put(itCol, val);
         
       } else if(key.getRow().compareTo(itKey.getRow()) > 0) {
         break;
@@ -233,8 +243,8 @@
    * TODO - This is kinda slow.  We need a data structure that allows for 
    * proximity-searches, not just precise-matches.
    */    
-  Vector<byte[]> get(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key, int numVersions) {
-    Vector<byte[]> result = new Vector<byte[]>();
+  Vector<BytesWritable> get(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key, int numVersions) {
+    Vector<BytesWritable> result = new Vector<BytesWritable>();
     HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
     SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(curKey);
 
@@ -242,7 +252,7 @@
       HStoreKey itKey = it.next();
       
       if(itKey.matchesRowCol(curKey)) {
-        result.add(tailMap.get(itKey).get());
+        result.add(tailMap.get(itKey));
         curKey.setVersion(itKey.getTimestamp() - 1);
       }
       
@@ -256,7 +266,7 @@
   /**
    * Return a scanner over the keys in the HMemcache
    */
-  public HScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
+  public HInternalScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
       throws IOException {
     
     return new HMemcacheScanner(timestamp, targetCols, firstRow);
@@ -280,13 +290,14 @@
       locker.readLock().lock();
       try {
         this.backingMaps = new TreeMap[history.size() + 1];
-        int i = 0;
-        for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator();
-            it.hasNext(); ) {
-          
-          backingMaps[i++] = it.next();
+        
+        //NOTE: Since we iterate through the backing maps from 0 to n, we need
+        //      to put the memcache first, the newest history second, ..., etc.
+        
+        backingMaps[0] = memcache;
+        for(int i = history.size() - 1; i > 0; i--) {
+          backingMaps[i] = history.elementAt(i);
         }
-        backingMaps[backingMaps.length - 1] = memcache;
 
         this.keyIterators = new Iterator[backingMaps.length];
         this.keys = new HStoreKey[backingMaps.length];
@@ -295,7 +306,7 @@
         // Generate list of iterators
 
         HStoreKey firstKey = new HStoreKey(firstRow);
-        for(i = 0; i < backingMaps.length; i++) {
+        for(int i = 0; i < backingMaps.length; i++) {
           if(firstRow.getLength() != 0) {
             keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator();
             
@@ -327,7 +338,8 @@
      * @return          - true if this is the first row
      */
     boolean findFirstRow(int i, Text firstRow) {
-      return ((firstRow.getLength() == 0) || (keys[i].getRow().equals(firstRow)));
+      return ((firstRow.getLength() == 0)
+          || (keys[i].getRow().toString().startsWith(firstRow.toString())));
     }
     
     /**

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java?view=diff&rev=535970&r1=535969&r2=535970
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java Mon May  7 12:58:53 2007
@@ -28,7 +28,7 @@
   public static final byte MSG_REGION_CLOSE = 2;
   public static final byte MSG_REGION_MERGE = 3;
   public static final byte MSG_CALL_SERVER_STARTUP = 4;
-  public static final byte MSG_REGIONSERVER_ALREADY_RUNNING = 5;
+  public static final byte MSG_REGIONSERVER_STOP = 5;
   public static final byte MSG_REGION_CLOSE_WITHOUT_REPORT = 6;
   public static final byte MSG_REGION_CLOSE_AND_DELETE = 7;
 



Mime
View raw message