hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r532083 [2/4] - in /lucene/hadoop/trunk: ./ src/contrib/hbase/conf/ src/contrib/hbase/src/java/org/apache/hadoop/hbase/ src/contrib/hbase/src/test/org/apache/hadoop/hbase/
Date Tue, 24 Apr 2007 21:13:10 GMT
Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Tue Apr 24 14:13:08 2007
@@ -15,6 +15,9 @@
  */
 package org.apache.hadoop.hbase;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
@@ -28,9 +31,21 @@
  * HMaster is the "master server" for a HBase.
  * There is only one HMaster for a single HBase deployment.
  ******************************************************************************/
-public class HMaster extends HGlobals 
-    implements HConstants, HMasterInterface, HMasterRegionInterface {
+public class HMaster implements HConstants, HMasterInterface, HMasterRegionInterface {
 
+  public long getProtocolVersion(String protocol, 
+      long clientVersion) throws IOException { 
+    if (protocol.equals(HMasterInterface.class.getName())) {
+      return HMasterInterface.versionID; 
+    } else if (protocol.equals(HMasterRegionInterface.class.getName())){
+      return HMasterRegionInterface.versionID;
+    } else {
+      throw new IOException("Unknown protocol to name node: " + protocol);
+    }
+  }
+
+  private final Log LOG = LogFactory.getLog(this.getClass().getName());
+  
   private boolean closed;
   private Path dir;
   private Configuration conf;
@@ -44,6 +59,7 @@
   
   private Leases serverLeases;
   private Server server;
+  private HServerAddress address;
   
   private HClient client;
  
@@ -99,92 +115,114 @@
    *        
    */
   private class RootScanner implements Runnable {
+    private final Text cols[] = {
+      ROOT_COLUMN_FAMILY
+    };
+    private final Text firstRow = new Text();
+    private HRegionInterface rootServer;
+    
     public RootScanner() {
+      rootServer = null;
     }
     
     public void run() {
-      Text cols[] = {
-        ROOT_COLUMN_FAMILY
-      };
-      Text firstRow = new Text();
-  
       while((!closed)) {
-        int metaRegions = 0;
-        while(rootRegionLocation == null) {
-          try {
-            rootRegionLocation.wait();
-              
-          } catch(InterruptedException e) {
-          }
-        }
-        
-        HRegionInterface server = null;
-        HScannerInterface scanner = null;
+        rootScanned = false;
+        waitForRootRegion();
+
+        rootServer = null;
+        long scannerId = -1L;
         
         try {
-          server = client.getHRegionConnection(rootRegionLocation);
-          scanner = server.openScanner(rootRegionInfo.regionName, cols, firstRow);
+          rootServer = client.getHRegionConnection(rootRegionLocation);
+          scannerId = rootServer.openScanner(HGlobals.rootRegionInfo.regionName, cols, firstRow);
           
         } catch(IOException iex) {
           try {
-            close();
+            iex.printStackTrace();
+            if(scannerId != -1L) {
+              rootServer.close(scannerId);
+            }
             
           } catch(IOException iex2) {
           }
+          closed = true;
           break;
         }
         try {
-          HStoreKey key = new HStoreKey();
-          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+          LOG.debug("starting root region scan");
+
           DataInputBuffer inbuf = new DataInputBuffer();
-  
-          while(scanner.next(key, results)) {
-            byte hRegionInfoBytes[] = results.get(ROOT_COL_REGIONINFO);
-            inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+          while(true) {
+            TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+            HStoreKey key = new HStoreKey();
+            LabelledData[] values = rootServer.next(scannerId, key);
+            if(values.length == 0) {
+              break;
+            }
+            for(int i = 0; i < values.length; i++) {
+              results.put(values[i].getLabel(), values[i].getData().get());
+            }
+            byte[] bytes = results.get(ROOT_COL_REGIONINFO);
+            if(bytes == null || bytes.length == 0) {
+              LOG.fatal("no value for " + ROOT_COL_REGIONINFO);
+              stop();
+            }
+            inbuf.reset(bytes, bytes.length);
             HRegionInfo info = new HRegionInfo();
             info.readFields(inbuf);
-                        
-            byte serverBytes[] = results.get(ROOT_COL_SERVER);
-            String serverName = new String(serverBytes, UTF8_ENCODING);
-  
-            byte startCodeBytes[] = results.get(ROOT_COL_STARTCODE);
-            long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING));
-  
+
+            String serverName = null;
+            bytes = results.get(ROOT_COL_SERVER);
+            if(bytes != null && bytes.length != 0) {
+              serverName = new String(bytes, UTF8_ENCODING);
+            }
+            
+            long startCode = -1L;
+            bytes = results.get(ROOT_COL_STARTCODE);
+            if(bytes != null && bytes.length != 0) {
+              startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
+            }
+    
             // Note META region to load.
-  
+    
             HServerInfo storedInfo = null;
-            synchronized(serversToServerInfo) {
-              storedInfo = serversToServerInfo.get(serverName);
-              if (storedInfo == null
-                  || storedInfo.getStartCode() != startCode) {
-              
-                // The current assignment is no good; load the region.
-  
-                synchronized(unassignedRegions) {
-                  unassignedRegions.put(info.regionName, info);
-                  assignAttempts.put(info.regionName, 0L);
-                }
-              }
+            if(serverName != null) {
+              serversToServerInfo.get(serverName);
+            }
+            if(storedInfo == null
+                || storedInfo.getStartCode() != startCode) {
+
+              // The current assignment is no good; load the region.
+
+              unassignedRegions.put(info.regionName, info);
+              assignAttempts.put(info.regionName, 0L);
+
+              LOG.debug("region unassigned: " + info.regionName);
             }
-            results.clear();
-            metaRegions += 1;
+
+            numMetaRegions += 1;
           }
-          
+
         } catch(Exception iex) {
+          iex.printStackTrace();
+          
         } finally {
           try {
-            scanner.close();
+            if(scannerId != -1L) {
+              rootServer.close(scannerId);
+            }
             
           } catch(IOException iex2) {
           }
+          scannerId = -1L;
         }
-        rootScanned = true;
-        numMetaRegions = metaRegions;
-        try {
-          Thread.sleep(metaRescanInterval);
+      }
+      rootScanned = true;
+      try {
+        Thread.sleep(metaRescanInterval);
           
-        } catch(InterruptedException e) {
-        }
+      } catch(InterruptedException e) {
       }
     }
   }
@@ -202,8 +240,9 @@
   /** Work for the meta scanner is queued up here */
   private Vector<MetaRegion> metaRegionsToScan;
 
-  private TreeMap<Text, MetaRegion> knownMetaRegions;
-  private Boolean allMetaRegionsScanned;
+  private SortedMap<Text, MetaRegion> knownMetaRegions;
+  
+  private boolean allMetaRegionsScanned;
   
   /**
    * MetaScanner scans a region either in the META table.
@@ -216,7 +255,7 @@
    */
   private class MetaScanner implements Runnable {
     private final Text cols[] = {
-      META_COLUMN_FAMILY
+        META_COLUMN_FAMILY
     };
     private final Text firstRow = new Text();
     
@@ -225,62 +264,94 @@
 
     private void scanRegion(MetaRegion region) {
       HRegionInterface server = null;
-      HScannerInterface scanner = null;
+      long scannerId = -1L;
+      
+      LOG.debug("scanning meta region: " + region.regionName);
       
       try {
         server = client.getHRegionConnection(region.server);
-        scanner = server.openScanner(region.regionName, cols, firstRow);
+        scannerId = server.openScanner(region.regionName, cols, firstRow);
         
       } catch(IOException iex) {
         try {
-          close();
+          if(scannerId != -1L) {
+            server.close(scannerId);
+            scannerId = -1L;
+          }
+          stop();
           
         } catch(IOException iex2) {
         }
         return;
       }
+
+      DataInputBuffer inbuf = new DataInputBuffer();
       try {
-        HStoreKey key = new HStoreKey();
-        TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-        DataInputBuffer inbuf = new DataInputBuffer();
+        while(true) {
+          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+          HStoreKey key = new HStoreKey();
 
-        while(scanner.next(key, results)) {
-          byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO);
-          inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+          LabelledData[] values = server.next(scannerId, key);
+
+          if(values.length == 0) {
+            break;
+          }
+
+          for(int i = 0; i < values.length; i++) {
+            results.put(values[i].getLabel(), values[i].getData().get());
+          }
+
+          byte bytes[] = results.get(META_COL_REGIONINFO);
+          if(bytes == null || bytes.length == 0) {
+            LOG.fatal("no value for " + META_COL_REGIONINFO);
+            stop();
+          }
+          inbuf.reset(bytes, bytes.length);
           HRegionInfo info = new HRegionInfo();
           info.readFields(inbuf);
-                      
-          byte serverBytes[] = results.get(META_COL_SERVER);
-          String serverName = new String(serverBytes, UTF8_ENCODING);
 
-          byte startCodeBytes[] = results.get(META_COL_STARTCODE);
-          long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING));
+          String serverName = null;
+          bytes = results.get(META_COL_SERVER);
+          if(bytes != null && bytes.length != 0) {
+            serverName = new String(bytes, UTF8_ENCODING);
+          }
+          
+          long startCode = -1L;
+          bytes = results.get(META_COL_STARTCODE);
+          if(bytes != null && bytes.length != 0) {
+            startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
+          }
 
           // Note HRegion to load.
 
           HServerInfo storedInfo = null;
-          synchronized(serversToServerInfo) {
-            storedInfo = serversToServerInfo.get(serverName);
-            if (storedInfo == null
-                || storedInfo.getStartCode() != startCode) {
-            
-              // The current assignment is no good; load the region.
+          if(serverName != null) {
+            serversToServerInfo.get(serverName);
+          }
+          if(storedInfo == null
+              || storedInfo.getStartCode() != startCode) {
 
-              synchronized(unassignedRegions) {
-                unassignedRegions.put(info.regionName, info);
-                assignAttempts.put(info.regionName, 0L);
-              }
-            }
+            // The current assignment is no good; load the region.
+
+            unassignedRegions.put(info.regionName, info);
+            assignAttempts.put(info.regionName, 0L);
+            LOG.debug("region unassigned: " + info.regionName);
           }
-          results.clear();
         }
+
       } catch(Exception iex) {
+        iex.printStackTrace();
+        
       } finally {
         try {
-          scanner.close();
+          if(scannerId != -1L) {
+            server.close(scannerId);
+          }
           
         } catch(IOException iex2) {
+          iex2.printStackTrace();
         }
+        scannerId = -1L;
       }
     }
 
@@ -290,27 +361,28 @@
         
         while(region == null) {
           synchronized(metaRegionsToScan) {
-            if (metaRegionsToScan.size() != 0) {
+            if(metaRegionsToScan.size() != 0) {
               region = metaRegionsToScan.remove(0);
             }
-          }
-          if (region == null) {
-            try {
-              metaRegionsToScan.wait();
-              
-            } catch(InterruptedException e) {
+            if(region == null) {
+              try {
+                metaRegionsToScan.wait();
+
+              } catch(InterruptedException e) {
+              }
             }
           }
         }
      
         scanRegion(region);
-        
-        synchronized(knownMetaRegions) {
-          knownMetaRegions.put(region.startKey, region);
-          if (rootScanned && knownMetaRegions.size() == numMetaRegions) {
-            allMetaRegionsScanned = true;
-            allMetaRegionsScanned.notifyAll();
-          }
+        if(closed) {
+          break;
+        }
+        knownMetaRegions.put(region.startKey, region);
+        if(rootScanned && knownMetaRegions.size() == numMetaRegions) {
+          LOG.debug("all meta regions scanned");
+          allMetaRegionsScanned = true;
+          metaRegionsScanned();
         }
 
         do {
@@ -319,8 +391,8 @@
           
           } catch(InterruptedException ex) {
           }
-          if (!allMetaRegionsScanned) {
-            break;                              // A region must have split
+          if(!allMetaRegionsScanned) {
+            break;                              // A meta region must have split
           }
           
           // Rescan the known meta regions every so often
@@ -328,12 +400,26 @@
           Vector<MetaRegion> v = new Vector<MetaRegion>();
           v.addAll(knownMetaRegions.values());
           
-          for(Iterator<MetaRegion> i = v.iterator(); i.hasNext();) {
+          for(Iterator<MetaRegion> i = v.iterator(); i.hasNext(); ) {
             scanRegion(i.next());
           }
         } while(true);
       }
     }
+
+    private synchronized void metaRegionsScanned() {
+      notifyAll();
+    }
+    
+    public synchronized void waitForMetaScan() {
+      while(!allMetaRegionsScanned) {
+        try {
+          wait();
+          
+        } catch(InterruptedException e) {
+        }
+      }
+    }
   }
 
   private MetaScanner metaScanner;
@@ -350,28 +436,28 @@
   // We fill 'unassignedRecords' by scanning ROOT and META tables, learning the 
   // set of all known valid regions.
 
-  private TreeMap<Text, HRegionInfo> unassignedRegions;
+  private SortedMap<Text, HRegionInfo> unassignedRegions;
 
   // The 'assignAttempts' table maps from regions to a timestamp that indicates 
   // the last time we *tried* to assign the region to a RegionServer. If the 
   // timestamp is out of date, then we can try to reassign it.
   
-  private TreeMap<Text, Long> assignAttempts;
+  private SortedMap<Text, Long> assignAttempts;
 
   // 'killList' indicates regions that we hope to close and then never reopen 
   // (because we're merging them, say).
 
-  private TreeMap<String, TreeMap<Text, HRegionInfo>> killList;
+  private SortedMap<String, TreeMap<Text, HRegionInfo>> killList;
 
   // 'serversToServerInfo' maps from the String to its HServerInfo
 
-  private TreeMap<String, HServerInfo> serversToServerInfo;
+  private SortedMap<String, HServerInfo> serversToServerInfo;
 
   /** Build the HMaster out of a raw configuration item. */
   public HMaster(Configuration conf) throws IOException {
     this(new Path(conf.get(HREGION_DIR, DEFAULT_HREGION_DIR)),
-         new HServerAddress(conf.get(MASTER_DEFAULT_NAME)),
-         conf);
+        new HServerAddress(conf.get(MASTER_ADDRESS, DEFAULT_MASTER_ADDRESS)),
+        conf);
   }
 
   /** 
@@ -391,28 +477,43 @@
 
     // Make sure the root directory exists!
     
-    if (!fs.exists(dir)) {
+    if(! fs.exists(dir)) {
       fs.mkdirs(dir);
     }
 
-    Path rootRegionDir = HStoreFile.getHRegionDir(dir, rootRegionInfo.regionName);
-    if (!fs.exists(rootRegionDir)) {
+    Path rootRegionDir = HStoreFile.getHRegionDir(dir, HGlobals.rootRegionInfo.regionName);
+    if(! fs.exists(rootRegionDir)) {
+      LOG.debug("bootstrap: creating root and meta regions");
       
       // Bootstrap! Need to create the root region and the first meta region.
-      //TODO is the root region self referential?
 
-      HRegion root = createNewHRegion(rootTableDesc, 0L);
-      HRegion meta = createNewHRegion(metaTableDesc, 1L);
+      try {
+        HRegion root = createNewHRegion(HGlobals.rootTableDesc, 0L);
+        HRegion meta = createNewHRegion(HGlobals.metaTableDesc, 1L);
       
-      addTableToMeta(root, meta);
+        addTableToMeta(root, meta);
+        
+        root.close();
+        meta.close();
+        
+      } catch(IOException e) {
+        e.printStackTrace();
+      }
     }
 
     this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
     this.msgQueue = new Vector<PendingOperation>();
-    this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 15 * 1000), 
-                                   conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
+    this.serverLeases = new Leases(conf.getLong("hbase.master.lease.period", 30 * 1000), 
+        conf.getLong("hbase.master.lease.thread.wakefrequency", 15 * 1000));
+    
     this.server = RPC.getServer(this, address.getBindAddress(),
-                                address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+        address.getPort(), conf.getInt("hbase.hregionserver.handler.count", 10), false, conf);
+
+    //  The rpc-server port can be ephemeral... ensure we have the correct info
+    
+    this.address = new HServerAddress(server.getListenerAddress());
+    conf.set(MASTER_ADDRESS, address.toString());
+    
     this.client = new HClient(conf);
     
     this.metaRescanInterval
@@ -429,8 +530,12 @@
 
     this.numMetaRegions = 0;
     this.metaRegionsToScan = new Vector<MetaRegion>();
-    this.knownMetaRegions = new TreeMap<Text, MetaRegion>();
-    this.allMetaRegionsScanned = new Boolean(false);
+    
+    this.knownMetaRegions = 
+      Collections.synchronizedSortedMap(new TreeMap<Text, MetaRegion>());
+    
+    this.allMetaRegionsScanned = false;
+
     this.metaScanner = new MetaScanner();
     this.metaScannerThread = new Thread(metaScanner, "HMaster.metaScanner");
 
@@ -439,14 +544,22 @@
     this.clientProcessor = new ClientProcessor();
     this.clientProcessorThread = new Thread(clientProcessor, "HMaster.clientProcessor");
     
-    this.unassignedRegions = new TreeMap<Text, HRegionInfo>();
-    this.unassignedRegions.put(rootRegionInfo.regionName, rootRegionInfo);
+    this.unassignedRegions = 
+      Collections.synchronizedSortedMap(new TreeMap<Text, HRegionInfo>());
+    
+    this.unassignedRegions.put(HGlobals.rootRegionInfo.regionName, HGlobals.rootRegionInfo);
+    
+    this.assignAttempts = 
+      Collections.synchronizedSortedMap(new TreeMap<Text, Long>());
     
-    this.assignAttempts = new TreeMap<Text, Long>();
-    this.assignAttempts.put(rootRegionInfo.regionName, 0L);
+    this.assignAttempts.put(HGlobals.rootRegionInfo.regionName, 0L);
 
-    this.killList = new TreeMap<String, TreeMap<Text, HRegionInfo>>();
-    this.serversToServerInfo = new TreeMap<String, HServerInfo>();
+    this.killList = 
+      Collections.synchronizedSortedMap(
+          new TreeMap<String, TreeMap<Text, HRegionInfo>>());
+    
+    this.serversToServerInfo = 
+      Collections.synchronizedSortedMap(new TreeMap<String, HServerInfo>());
     
     // We're almost open for business
     
@@ -470,10 +583,11 @@
       this.closed = true;
       throw e;
     }
+    LOG.info("HMaster started");
   }
 
   /** Turn off the HMaster.  Turn off all the threads, close files, etc. */
-  public void close() throws IOException {
+  public void stop() throws IOException {
     closed = true;
 
     try {
@@ -481,9 +595,18 @@
       
     } catch(IOException iex) {
     }
-    
+    server.stop();
+    LOG.info("shutting down HMaster");
+  }
+  
+  /** returns the HMaster server address */
+  public HServerAddress getMasterAddress() {
+    return address;
+  }
+
+  /** Call this to wait for everything to finish */
+  public void join() {
     try {
-      server.stop();
       server.join();
       
     } catch(InterruptedException iex) {
@@ -503,6 +626,7 @@
       
     } catch(Exception iex) {
     }
+    LOG.info("HMaster stopped");
   }
 
   //////////////////////////////////////////////////////////////////////////////
@@ -514,79 +638,79 @@
     String server = serverInfo.getServerAddress().toString();
     HServerInfo storedInfo = null;
 
+    LOG.debug("received start message from: " + server);
+    
     // If we get the startup message but there's an old server by that
     // name, then we can timeout the old one right away and register
     // the new one.
     
-    synchronized(serversToServerInfo) {
-      storedInfo = serversToServerInfo.get(server);
-        
-      if (storedInfo != null) {
-        serversToServerInfo.remove(server);
+    storedInfo = serversToServerInfo.get(server);
 
-        synchronized(msgQueue) {
-          msgQueue.add(new PendingServerShutdown(storedInfo));
-          msgQueue.notifyAll();
-        }
+    if(storedInfo != null) {
+      serversToServerInfo.remove(server);
 
+      synchronized(msgQueue) {
+        msgQueue.add(new PendingServerShutdown(storedInfo));
+        msgQueue.notifyAll();
       }
+    }
 
-      // Either way, record the new server
+    // Either way, record the new server
 
-      serversToServerInfo.put(server, serverInfo);
+    serversToServerInfo.put(server, serverInfo);
 
-
-      Text serverLabel = new Text(server);        
-      serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server));
-    }
+    Text serverLabel = new Text(server);        
+    serverLeases.createLease(serverLabel, serverLabel, new ServerExpirer(server));
   }
 
   /** HRegionServers call this method repeatedly. */
   public HMsg[] regionServerReport(HServerInfo serverInfo, HMsg msgs[]) throws IOException {
     String server = serverInfo.getServerAddress().toString();
 
-    synchronized(serversToServerInfo) {
-      HServerInfo storedInfo = serversToServerInfo.get(server);
-      
-      if (storedInfo == null) {
-        
-        // The HBaseMaster may have been restarted.
-        // Tell the RegionServer to start over and call regionServerStartup()
-        
-        HMsg returnMsgs[] = new HMsg[1];
-        returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP);
-        return returnMsgs;
-        
-      } else if (storedInfo.getStartCode() != serverInfo.getStartCode()) {
-        
-        // This state is reachable if:
-        //
-        // 1) RegionServer A started
-        // 2) RegionServer B started on the same machine, then 
-        //    clobbered A in regionServerStartup.
-        // 3) RegionServer A returns, expecting to work as usual.
-        //
-        // The answer is to ask A to shut down for good.
-        
-        HMsg returnMsgs[] = new HMsg[1];
-        returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING);
-        return returnMsgs;
-        
-      } else {
-        
-        // All's well.  Renew the server's lease.
-        // This will always succeed; otherwise, the fetch of serversToServerInfo
-        // would have failed above.
-        
-        Text serverLabel = new Text(server);
-        serverLeases.renewLease(serverLabel, serverLabel);
+    HServerInfo storedInfo = serversToServerInfo.get(server);
 
-        // Refresh the info object
-        serversToServerInfo.put(server, serverInfo);
+    if(storedInfo == null) {
 
-        // Next, process messages for this server
-        return processMsgs(serverInfo, msgs);
-      }
+      LOG.debug("received server report from unknown server: " + server);
+
+      // The HBaseMaster may have been restarted.
+      // Tell the RegionServer to start over and call regionServerStartup()
+
+      HMsg returnMsgs[] = new HMsg[1];
+      returnMsgs[0] = new HMsg(HMsg.MSG_CALL_SERVER_STARTUP);
+      return returnMsgs;
+
+    } else if(storedInfo.getStartCode() != serverInfo.getStartCode()) {
+
+      // This state is reachable if:
+      //
+      // 1) RegionServer A started
+      // 2) RegionServer B started on the same machine, then 
+      //    clobbered A in regionServerStartup.
+      // 3) RegionServer A returns, expecting to work as usual.
+      //
+      // The answer is to ask A to shut down for good.
+
+      LOG.debug("region server race condition detected: " + server);
+
+      HMsg returnMsgs[] = new HMsg[1];
+      returnMsgs[0] = new HMsg(HMsg.MSG_REGIONSERVER_ALREADY_RUNNING);
+      return returnMsgs;
+
+    } else {
+
+      // All's well.  Renew the server's lease.
+      // This will always succeed; otherwise, the fetch of serversToServerInfo
+      // would have failed above.
+
+      Text serverLabel = new Text(server);
+      serverLeases.renewLease(serverLabel, serverLabel);
+
+      // Refresh the info object
+      serversToServerInfo.put(server, serverInfo);
+
+      // Next, process messages for this server
+      return processMsgs(serverInfo, msgs);
     }
   }
 
@@ -597,9 +721,9 @@
     // Process the kill list
     
     TreeMap<Text, HRegionInfo> regionsToKill = killList.get(info.toString());
-    if (regionsToKill != null) {
+    if(regionsToKill != null) {
       for(Iterator<HRegionInfo> i = regionsToKill.values().iterator();
-          i.hasNext();) {
+          i.hasNext(); ) {
         
         returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_AND_DELETE, i.next()));
       }
@@ -607,151 +731,177 @@
 
     // Get reports on what the RegionServer did.
     
-    synchronized(unassignedRegions) {
-      for(int i = 0; i < incomingMsgs.length; i++) {
-        HRegionInfo region = incomingMsgs[i].getRegionInfo();
-        
-        switch(incomingMsgs[i].getMsg()) {
+    for(int i = 0; i < incomingMsgs.length; i++) {
+      HRegionInfo region = incomingMsgs[i].getRegionInfo();
 
-        case HMsg.MSG_REPORT_OPEN:
-          HRegionInfo regionInfo = unassignedRegions.get(region.regionName);
+      switch(incomingMsgs[i].getMsg()) {
 
-          if (regionInfo == null) {
+      case HMsg.MSG_REPORT_OPEN:
+        HRegionInfo regionInfo = unassignedRegions.get(region.regionName);
 
-            // This Region should not have been opened.
-            // Ask the server to shut it down, but don't report it as closed.  
-            // Otherwise the HMaster will think the Region was closed on purpose, 
-            // and then try to reopen it elsewhere; that's not what we want.
+        if(regionInfo == null) {
 
-            returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); 
+          LOG.debug("region server " + info.getServerAddress().toString()
+              + "should not have opened region " + region.regionName);
 
-          } else {
+          // This Region should not have been opened.
+          // Ask the server to shut it down, but don't report it as closed.  
+          // Otherwise the HMaster will think the Region was closed on purpose, 
+          // and then try to reopen it elsewhere; that's not what we want.
 
-            // Remove from unassigned list so we don't assign it to someone else
+          returnMsgs.add(new HMsg(HMsg.MSG_REGION_CLOSE_WITHOUT_REPORT, region)); 
 
-            unassignedRegions.remove(region.regionName);
-            assignAttempts.remove(region.regionName);
+        } else {
 
-            if (region.regionName.compareTo(rootRegionInfo.regionName) == 0) {
+          LOG.debug(info.getServerAddress().toString() + " serving "
+              + region.regionName);
 
-              // Store the Root Region location (in memory)
+          // Remove from unassigned list so we don't assign it to someone else
 
-              rootRegionLocation = new HServerAddress(info.getServerAddress());
-              
-              // Wake up the root scanner
-              
-              rootRegionLocation.notifyAll();
-              break;
-              
-            } else if (region.regionName.find(META_TABLE_NAME.toString()) == 0) {
+          unassignedRegions.remove(region.regionName);
+          assignAttempts.remove(region.regionName);
 
-              // It's a meta region. Put it on the queue to be scanned.
-              
-              MetaRegion r = new MetaRegion();
-              r.server = info.getServerAddress();
-              r.regionName = region.regionName;
-              r.startKey = region.startKey;
-              
-              synchronized(metaRegionsToScan) {
-                metaRegionsToScan.add(r);
-                metaRegionsToScan.notifyAll();
-              }
-            }
-            
-            // Queue up an update to note the region location.
+          if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) {
+
+            // Store the Root Region location (in memory)
+
+            rootRegionLocation = new HServerAddress(info.getServerAddress());
+
+            // Wake up threads waiting for the root server
+
+            rootRegionIsAvailable();
+            break;
+
+          } else if(region.regionName.find(META_TABLE_NAME.toString()) == 0) {
+
+            // It's a meta region. Put it on the queue to be scanned.
 
-            synchronized(msgQueue) {
-              msgQueue.add(new PendingOpenReport(info, region.regionName));
-              msgQueue.notifyAll();
+            MetaRegion r = new MetaRegion();
+            r.server = info.getServerAddress();
+            r.regionName = region.regionName;
+            r.startKey = region.startKey;
+
+            synchronized(metaRegionsToScan) {
+              metaRegionsToScan.add(r);
+              metaRegionsToScan.notifyAll();
             }
           }
-          break;
 
-        case HMsg.MSG_REPORT_CLOSE:
-          if (region.regionName.compareTo(rootRegionInfo.regionName) == 0) { // Root region
-            rootRegionLocation = null;
-            unassignedRegions.put(region.regionName, region);
-            assignAttempts.put(region.regionName, 0L);
+          // Queue up an update to note the region location.
 
-          } else {
-            boolean reassignRegion = true;
-            
-            if (regionsToKill.containsKey(region.regionName)) {
+          synchronized(msgQueue) {
+            msgQueue.add(new PendingOpenReport(info, region.regionName));
+            msgQueue.notifyAll();
+          }
+        }
+        break;
+
+      case HMsg.MSG_REPORT_CLOSE:
+        LOG.debug(info.getServerAddress().toString() + " no longer serving "
+            + region.regionName);
+
+        if(region.regionName.compareTo(HGlobals.rootRegionInfo.regionName) == 0) { // Root region
+          rootRegionLocation = null;
+          unassignedRegions.put(region.regionName, region);
+          assignAttempts.put(region.regionName, 0L);
+
+        } else {
+          boolean reassignRegion = true;
+
+          synchronized(regionsToKill) {
+            if(regionsToKill.containsKey(region.regionName)) {
               regionsToKill.remove(region.regionName);
-              
-              if (regionsToKill.size() > 0) {
+
+              if(regionsToKill.size() > 0) {
                 killList.put(info.toString(), regionsToKill);
-                
+
               } else {
                 killList.remove(info.toString());
               }
               reassignRegion = false;
             }
-            
-            synchronized(msgQueue) {
-              msgQueue.add(new PendingCloseReport(region, reassignRegion));
-              msgQueue.notifyAll();
-            }
-
-            // NOTE: we cannot put the region into unassignedRegions as that
-            //       could create a race with the pending close if it gets 
-            //       reassigned before the close is processed.
-
           }
-          break;
 
-        case HMsg.MSG_NEW_REGION:
-          if (region.regionName.find(META_TABLE_NAME.toString()) == 0) {
-            // A meta region has split.
-            
-            allMetaRegionsScanned = false;
-          }
-          synchronized(unassignedRegions) {
-            unassignedRegions.put(region.regionName, region);
-            assignAttempts.put(region.regionName, 0L);
+          synchronized(msgQueue) {
+            msgQueue.add(new PendingCloseReport(region, reassignRegion));
+            msgQueue.notifyAll();
           }
-          break;
-          
-        default:
-          throw new IOException("Impossible state during msg processing.  Instruction: "
-                                + incomingMsgs[i].getMsg());
+
+          // NOTE: we cannot put the region into unassignedRegions as that
+          //       could create a race with the pending close if it gets 
+          //       reassigned before the close is processed.
+
         }
+        break;
+
+      case HMsg.MSG_NEW_REGION:
+        LOG.debug("new region " + region.regionName);
+
+        if(region.regionName.find(META_TABLE_NAME.toString()) == 0) {
+          // A meta region has split.
+
+          allMetaRegionsScanned = false;
+        }
+        unassignedRegions.put(region.regionName, region);
+        assignAttempts.put(region.regionName, 0L);
+        break;
+
+      default:
+        throw new IOException("Impossible state during msg processing.  Instruction: "
+            + incomingMsgs[i].getMsg());
       }
+    }
 
-      // Figure out what the RegionServer ought to do, and write back.
+    // Figure out what the RegionServer ought to do, and write back.
 
-      if (unassignedRegions.size() > 0) {
+    if(unassignedRegions.size() > 0) {
 
-        // Open new regions as necessary
+      // Open new regions as necessary
 
-        int targetForServer = (int) Math.ceil(unassignedRegions.size()
-                                              / (1.0 * serversToServerInfo.size()));
+      int targetForServer = (int) Math.ceil(unassignedRegions.size()
+          / (1.0 * serversToServerInfo.size()));
 
-        int counter = 0;
-        long now = System.currentTimeMillis();
+      int counter = 0;
+      long now = System.currentTimeMillis();
 
-        for(Iterator<Text> it = unassignedRegions.keySet().iterator();
-            it.hasNext();) {
+      for(Iterator<Text> it = unassignedRegions.keySet().iterator();
+      it.hasNext(); ) {
 
-          Text curRegionName = it.next();
-          HRegionInfo regionInfo = unassignedRegions.get(curRegionName);
-          long assignedTime = assignAttempts.get(curRegionName);
+        Text curRegionName = it.next();
+        HRegionInfo regionInfo = unassignedRegions.get(curRegionName);
+        long assignedTime = assignAttempts.get(curRegionName);
 
-          if (now - assignedTime > maxRegionOpenTime) {
-            returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
+        if(now - assignedTime > maxRegionOpenTime) {
+          LOG.debug("assigning region " + regionInfo.regionName + " to server "
+              + info.getServerAddress().toString());
 
-            assignAttempts.put(curRegionName, now);
-            counter++;
-          }
+          returnMsgs.add(new HMsg(HMsg.MSG_REGION_OPEN, regionInfo));
 
-          if (counter >= targetForServer) {
-            break;
-          }
+          assignAttempts.put(curRegionName, now);
+          counter++;
+        }
+
+        if(counter >= targetForServer) {
+          break;
         }
       }
     }
     return (HMsg[]) returnMsgs.toArray(new HMsg[returnMsgs.size()]);
   }
+  
+  private synchronized void rootRegionIsAvailable() {
+    notifyAll();
+  }
+  
+  private synchronized void waitForRootRegion() {
+    while(rootRegionLocation == null) {
+      try {
+        wait();
+        
+      } catch(InterruptedException e) {
+      }
+    }
+  }
 
   //////////////////////////////////////////////////////////////////////////////
   // Some internal classes to manage msg-passing and client operations
@@ -773,24 +923,21 @@
             } catch(InterruptedException iex) {
             }
           }
-          op = msgQueue.elementAt(msgQueue.size()-1);
-          msgQueue.removeElementAt(msgQueue.size()-1);
+          op = msgQueue.remove(msgQueue.size()-1);
         }
         try {
           op.process();
           
         } catch(Exception ex) {
-          synchronized(msgQueue) {
-            msgQueue.insertElementAt(op, 0);
-          }
+          msgQueue.insertElementAt(op, 0);
         }
       }
     }
   }
 
-  abstract class PendingOperation {
+  private abstract class PendingOperation {
     protected final Text[] columns = {
-      META_COLUMN_FAMILY
+        META_COLUMN_FAMILY
     };
     protected final Text startRow = new Text();
     protected long clientId;
@@ -802,9 +949,9 @@
     public abstract void process() throws IOException;
   }
   
-  class PendingServerShutdown extends PendingOperation {
-    String deadServer;
-    long oldStartCode;
+  private class PendingServerShutdown extends PendingOperation {
+    private String deadServer;
+    private long oldStartCode;
     
     public PendingServerShutdown(HServerInfo serverInfo) {
       super();
@@ -812,30 +959,40 @@
       this.oldStartCode = serverInfo.getStartCode();
     }
     
-    private void scanMetaRegion(HRegionInterface server, HScannerInterface scanner,
-                                Text regionName) throws IOException {
+    private void scanMetaRegion(HRegionInterface server, long scannerId,
+        Text regionName) throws IOException {
 
       Vector<HStoreKey> toDoList = new Vector<HStoreKey>();
       TreeMap<Text, HRegionInfo> regions = new TreeMap<Text, HRegionInfo>();
 
       DataInputBuffer inbuf = new DataInputBuffer();
       try {
-        HStoreKey key = new HStoreKey();
-        TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+        LabelledData[] values = null;
 
-        while(scanner.next(key, results)) {
-          byte serverBytes[] = results.get(META_COL_SERVER);
-          String serverName = new String(serverBytes, UTF8_ENCODING);
+        while(true) {
+          HStoreKey key = new HStoreKey();
+          values = server.next(scannerId, key);
+          if(values.length == 0) {
+            break;
+          }
+
+          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+          for(int i = 0; i < values.length; i++) {
+            results.put(values[i].getLabel(), values[i].getData().get());
+          }
+          
+          String serverName = 
+            new String(results.get(META_COL_SERVER), UTF8_ENCODING);
 
-          if (deadServer.compareTo(serverName) != 0) {
+          if(deadServer.compareTo(serverName) != 0) {
             // This isn't the server you're looking for - move along
             continue;
           }
 
-          byte startCodeBytes[] = results.get(META_COL_STARTCODE);
-          long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING));
+          long startCode = 
+            Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING));
 
-          if (oldStartCode != startCode) {
+          if(oldStartCode != startCode) {
             // Close but no cigar
             continue;
           }
@@ -847,6 +1004,8 @@
           HRegionInfo info = new HRegionInfo();
           info.readFields(inbuf);
 
+          LOG.debug(serverName + " was serving " + info.regionName);
+          
           // Add to our to do lists
 
           toDoList.add(key);
@@ -854,7 +1013,16 @@
         }
 
       } finally {
-        scanner.close();
+        if(scannerId != -1L) {
+          try {
+            server.close(scannerId);
+            
+          } catch(IOException e) {
+            e.printStackTrace();
+            
+          }
+        }
+        scannerId = -1L;
       }
 
       // Remove server from root/meta entries
@@ -869,56 +1037,51 @@
       // Put all the regions we found on the unassigned region list
 
       for(Iterator<Map.Entry<Text, HRegionInfo>> i = regions.entrySet().iterator();
-          i.hasNext();) {
+          i.hasNext(); ) {
 
         Map.Entry<Text, HRegionInfo> e = i.next();
         Text region = e.getKey();
         HRegionInfo regionInfo = e.getValue();
 
-        synchronized(unassignedRegions) {
-          unassignedRegions.put(region, regionInfo);
-          assignAttempts.put(region, 0L);
-        }
+        unassignedRegions.put(region, regionInfo);
+        assignAttempts.put(region, 0L);
       }
     }
     
     public void process() throws IOException {
+      LOG.debug("server shutdown: " + deadServer);
+
+      // Scan the ROOT region
+      
+      waitForRootRegion();      // Wait until the root region is available
+      HRegionInterface server = client.getHRegionConnection(rootRegionLocation);
+      long scannerId = 
+        server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow);
+      
+      scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName);
 
       // We can not scan every meta region if they have not already been assigned
       // and scanned.
-      
-      while(!allMetaRegionsScanned) {
-        try {
-          allMetaRegionsScanned.wait();
-          
-        } catch(InterruptedException e) {
-        }
-      }
 
-      // First scan the ROOT region
-      
-      HRegionInterface server = client.getHRegionConnection(rootRegionLocation);
-      HScannerInterface scanner = server.openScanner(rootRegionInfo.regionName,
-                                                     columns, startRow);
+      metaScanner.waitForMetaScan();
       
-      scanMetaRegion(server, scanner, rootRegionInfo.regionName);
       for(Iterator<MetaRegion> i = knownMetaRegions.values().iterator();
-          i.hasNext();) {
+          i.hasNext(); ) {
         
         MetaRegion r = i.next();
 
         server = client.getHRegionConnection(r.server);
-        scanner = server.openScanner(r.regionName, columns, startRow);
-        scanMetaRegion(server, scanner, r.regionName);
+        scannerId = server.openScanner(r.regionName, columns, startRow);
+        scanMetaRegion(server, scannerId, r.regionName);
       }
     }
   }
   
   /** PendingCloseReport is a close message that is saved in a different thread. */
-  class PendingCloseReport extends PendingOperation {
-    HRegionInfo regionInfo;
-    boolean reassignRegion;
-    boolean rootRegion;
+  private class PendingCloseReport extends PendingOperation {
+    private HRegionInfo regionInfo;
+    private boolean reassignRegion;
+    private boolean rootRegion;
     
     public PendingCloseReport(HRegionInfo regionInfo, boolean reassignRegion) {
       super();
@@ -929,7 +1092,7 @@
       // If the region closing down is a meta region then we need to update
       // the ROOT table
       
-      if (this.regionInfo.regionName.find(metaTableDesc.getName().toString()) == 0) {
+      if(this.regionInfo.regionName.find(HGlobals.metaTableDesc.getName().toString()) == 0) {
         this.rootRegion = true;
         
       } else {
@@ -938,24 +1101,21 @@
     }
     
     public void process() throws IOException {
-
+      
       // We can not access any meta region if they have not already been assigned
       // and scanned.
+
+      metaScanner.waitForMetaScan();
       
-      while(!allMetaRegionsScanned) {
-        try {
-          allMetaRegionsScanned.wait();
-          
-        } catch(InterruptedException e) {
-        }
-      }
+      LOG.debug("region closed: " + regionInfo.regionName);
 
       // Mark the Region as unavailable in the appropriate meta table
 
       Text metaRegionName;
       HRegionInterface server;
-      if (rootRegion) {
-        metaRegionName = rootRegionInfo.regionName;
+      if(rootRegion) {
+        metaRegionName = HGlobals.rootRegionInfo.regionName;
+        waitForRootRegion();            // Make sure root region available
         server = client.getHRegionConnection(rootRegionLocation);
         
       } else {
@@ -969,24 +1129,24 @@
       server.delete(metaRegionName, clientId, lockid, META_COL_STARTCODE);
       server.commit(metaRegionName, clientId, lockid);
       
-      if (reassignRegion) {
-        synchronized(unassignedRegions) {
-          unassignedRegions.put(regionInfo.regionName, regionInfo);
-          assignAttempts.put(regionInfo.regionName, 0L);
-        }
+      if(reassignRegion) {
+        LOG.debug("reassign region: " + regionInfo.regionName);
+        
+        unassignedRegions.put(regionInfo.regionName, regionInfo);
+        assignAttempts.put(regionInfo.regionName, 0L);
       }
     }
   }
 
   /** PendingOpenReport is an open message that is saved in a different thread. */
-  class PendingOpenReport extends PendingOperation {
-    boolean rootRegion;
-    Text regionName;
-    BytesWritable serverAddress;
-    BytesWritable startCode;
+  private class PendingOpenReport extends PendingOperation {
+    private boolean rootRegion;
+    private Text regionName;
+    private BytesWritable serverAddress;
+    private BytesWritable startCode;
     
     public PendingOpenReport(HServerInfo info, Text regionName) {
-      if (regionName.find(metaTableDesc.getName().toString()) == 0) {
+      if(regionName.find(HGlobals.metaTableDesc.getName().toString()) == 0) {
         
         // The region which just came on-line is a META region.
         // We need to look in the ROOT region for its information.
@@ -1003,10 +1163,10 @@
       
       try {
         this.serverAddress = new BytesWritable(
-                                               info.getServerAddress().toString().getBytes(UTF8_ENCODING));
+            info.getServerAddress().toString().getBytes(UTF8_ENCODING));
         
         this.startCode = new BytesWritable(
-                                           String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
+            String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
         
       } catch(UnsupportedEncodingException e) {
       }
@@ -1017,21 +1177,18 @@
 
       // We can not access any meta region if they have not already been assigned
       // and scanned.
+
+      metaScanner.waitForMetaScan();
       
-      while(!allMetaRegionsScanned) {
-        try {
-          allMetaRegionsScanned.wait();
-          
-        } catch(InterruptedException e) {
-        }
-      }
+      LOG.debug(regionName + " open on " + serverAddress.toString());
 
       // Register the newly-available Region's location.
 
       Text metaRegionName;
       HRegionInterface server;
-      if (rootRegion) {
-        metaRegionName = rootRegionInfo.regionName;
+      if(rootRegion) {
+        metaRegionName = HGlobals.rootRegionInfo.regionName;
+        waitForRootRegion();            // Make sure root region available
         server = client.getHRegionConnection(rootRegionLocation);
         
       } else {
@@ -1056,15 +1213,9 @@
     
     // We can not access any meta region if they have not already been assigned
     // and scanned.
-    
-    while(!allMetaRegionsScanned) {
-      try {
-        allMetaRegionsScanned.wait();
-        
-      } catch(InterruptedException e) {
-      }
-    }
 
+    metaScanner.waitForMetaScan();
+    
     // 1. Check to see if table already exists
 
     Text metaStartRow = knownMetaRegions.headMap(newRegion.regionName).lastKey();
@@ -1074,13 +1225,13 @@
 
 
     BytesWritable bytes = server.get(metaRegionName, desc.getName(), META_COL_REGIONINFO);
-    if (bytes != null && bytes.getSize() != 0) {
+    if(bytes != null && bytes.getSize() != 0) {
       byte[] infoBytes = bytes.get();
       DataInputBuffer inbuf = new DataInputBuffer();
       inbuf.reset(infoBytes, infoBytes.length);
       HRegionInfo info = new HRegionInfo();
       info.readFields(inbuf);
-      if (info.tableDesc.getName().compareTo(desc.getName()) == 0) {
+      if(info.tableDesc.getName().compareTo(desc.getName()) == 0) {
         throw new IOException("table already exists");
       }
     }
@@ -1100,15 +1251,19 @@
     long clientId = rand.nextLong();
     long lockid = server.startUpdate(metaRegionName, clientId, regionName);
     server.put(metaRegionName, clientId, lockid, META_COL_REGIONINFO, 
-               new BytesWritable(byteValue.toByteArray()));
+        new BytesWritable(byteValue.toByteArray()));
     server.commit(metaRegionName, clientId, lockid);
     
-    // 4. Get it assigned to a server
+    // 4. Close the new region to flush it to disk
     
-    synchronized(unassignedRegions) {
-      unassignedRegions.put(regionName, info);
-      assignAttempts.put(regionName, 0L);
-    }
+    r.close();
+    
+    // 5. Get it assigned to a server
+    
+    unassignedRegions.put(regionName, info);
+    assignAttempts.put(regionName, 0L);
+    
+    LOG.debug("created table " + desc.getName());
   }
 
   /**
@@ -1122,14 +1277,14 @@
    * @throws IOException
    */
   private HRegion createNewHRegion(HTableDescriptor desc, long regionId) 
-    throws IOException {
+      throws IOException {
     
     HRegionInfo info = new HRegionInfo(regionId, desc, null, null);
     Path regionDir = HStoreFile.getHRegionDir(dir, info.regionName);
     fs.mkdirs(regionDir);
 
     return new HRegion(dir, new HLog(fs, new Path(regionDir, "log"), conf), fs,
-                       conf, info, null, null);
+        conf, info, null, null);
   }
   
   /**
@@ -1152,96 +1307,99 @@
 
     table.getRegionInfo().write(s);
     
-    s.writeLong(table.getRegionId());
     meta.put(writeid, META_COL_REGIONINFO, bytes.toByteArray());
     
-    bytes.reset();
-    new HServerAddress().write(s);
-    meta.put(writeid, META_COL_SERVER, bytes.toByteArray());
-    
-    bytes.reset();
-    s.writeLong(0L);
-    meta.put(writeid, META_COL_STARTCODE, bytes.toByteArray());
-    
     meta.commit(writeid);
   }
   
   public void deleteTable(Text tableName) throws IOException {
     Text[] columns = {
-      META_COLUMN_FAMILY
+        META_COLUMN_FAMILY
     };
     
     // We can not access any meta region if they have not already been assigned
     // and scanned.
-    
-    while(!allMetaRegionsScanned) {
-      try {
-        allMetaRegionsScanned.wait();
-        
-      } catch(InterruptedException e) {
-      }
-    }
 
-    for(Iterator<MetaRegion> i = knownMetaRegions.tailMap(tableName).values().iterator();
-        i.hasNext();) {
+    metaScanner.waitForMetaScan();
+    
+    for(Iterator<MetaRegion> it = knownMetaRegions.tailMap(tableName).values().iterator();
+        it.hasNext(); ) {
 
       // Find all the regions that make up this table
       
       long clientId = rand.nextLong();
-      MetaRegion m = i.next();
+      MetaRegion m = it.next();
       HRegionInterface server = client.getHRegionConnection(m.server);
+      long scannerId = -1L;
       try {
-        HScannerInterface scanner
-          = server.openScanner(m.regionName, columns, tableName);
-        
-        HStoreKey key = new HStoreKey();
-        TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-        DataInputBuffer inbuf = new DataInputBuffer();
+        scannerId = server.openScanner(m.regionName, columns, tableName);
         
         Vector<Text> rowsToDelete = new Vector<Text>();
         
-        while(scanner.next(key, results)) {
-          byte hRegionInfoBytes[] = results.get(META_COL_REGIONINFO);
-          inbuf.reset(hRegionInfoBytes, hRegionInfoBytes.length);
+        DataInputBuffer inbuf = new DataInputBuffer();
+        while(true) {
+          LabelledData[] values = null;
+          HStoreKey key = new HStoreKey();
+          values = server.next(scannerId, key);
+          if(values.length == 0) {
+            break;
+          }
+          TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+          for(int i = 0; i < values.length; i++) {
+            results.put(values[i].getLabel(), values[i].getData().get());
+          }
+          byte bytes[] = results.get(META_COL_REGIONINFO);
+          inbuf.reset(bytes, bytes.length);
           HRegionInfo info = new HRegionInfo();
           info.readFields(inbuf);
 
-          if (info.tableDesc.getName().compareTo(tableName) > 0) {
+          if(info.tableDesc.getName().compareTo(tableName) > 0) {
             break;                      // Beyond any more entries for this table
           }
 
           // Is it being served?
           
-          byte serverBytes[] = results.get(META_COL_SERVER);
-          String serverName = new String(serverBytes, UTF8_ENCODING);
+          String serverName =
+            new String(results.get(META_COL_SERVER), UTF8_ENCODING);
 
-          byte startCodeBytes[] = results.get(META_COL_STARTCODE);
-          long startCode = Long.decode(new String(startCodeBytes, UTF8_ENCODING));
+          long startCode = 
+            Long.valueOf(new String(results.get(META_COL_STARTCODE), UTF8_ENCODING));
 
-          synchronized(serversToServerInfo) {
-            HServerInfo s = serversToServerInfo.get(serverName);
-            if (s != null && s.getStartCode() == startCode) {
-              
-              // It is being served. Tell the server to stop it and not report back
-              
-              TreeMap<Text, HRegionInfo> regionsToKill = killList.get(serverName);
-              if (regionsToKill == null) {
-                regionsToKill = new TreeMap<Text, HRegionInfo>();
-              }
-              regionsToKill.put(info.regionName, info);
-              killList.put(serverName, regionsToKill);
+          HServerInfo s = serversToServerInfo.get(serverName);
+          if(s != null && s.getStartCode() == startCode) {
+
+            // It is being served. Tell the server to stop it and not report back
+
+            TreeMap<Text, HRegionInfo> regionsToKill = killList.get(serverName);
+            if(regionsToKill == null) {
+              regionsToKill = new TreeMap<Text, HRegionInfo>();
             }
+            regionsToKill.put(info.regionName, info);
+            killList.put(serverName, regionsToKill);
           }
         }
-        for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext();) {
+        for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
           long lockid = server.startUpdate(m.regionName, clientId, row.next());
           server.delete(m.regionName, clientId, lockid, columns[0]);
           server.commit(m.regionName, clientId, lockid);
         }
       } catch(IOException e) {
         e.printStackTrace();
+        
+      } finally {
+        if(scannerId != -1L) {
+          try {
+            server.close(scannerId);
+            
+          } catch(IOException e) {
+            e.printStackTrace();
+            
+          }
+        }
+        scannerId = -1L;
       }
     }
+    LOG.debug("deleted table: " + tableName);
   }
   
   public HServerAddress findRootRegion() {
@@ -1252,24 +1410,48 @@
   // Managing leases
   //////////////////////////////////////////////////////////////////////////////
   
-  class ServerExpirer extends LeaseListener {
-    String server = null;
+  private class ServerExpirer extends LeaseListener {
+    private String server;
     
     public ServerExpirer(String server) {
       this.server = new String(server);
     }
     
     public void leaseExpired() {
-      HServerInfo storedInfo = null;
+      LOG.debug(server + " lease expired");
       
-      synchronized(serversToServerInfo) {
-        storedInfo = serversToServerInfo.remove(server);
-      }
+      HServerInfo storedInfo = serversToServerInfo.remove(server);
       synchronized(msgQueue) {
         msgQueue.add(new PendingServerShutdown(storedInfo));
         msgQueue.notifyAll();
       }
     }
+  }
+
+  private static void printUsage() {
+    System.err.println("Usage: java org.apache.hbase.HMaster " +
+        "[--bind=hostname:port]");
+  }
+  
+  public static void main(String [] args) throws IOException {
+    Configuration conf = new HBaseConfiguration();
+    
+    // Process command-line args. TODO: Better cmd-line processing
+    // (but hopefully something not as painful as cli options).
+    for (String cmd: args) {
+      if (cmd.equals("-h") || cmd.startsWith("--h")) {
+        printUsage();
+        return;
+      }
+      
+      final String addressArgKey = "--bind=";
+      if (cmd.startsWith(addressArgKey)) {
+        conf.set(MASTER_ADDRESS,
+            cmd.substring(addressArgKey.length()));
+      }
+    }
+    
+    new HMaster(conf);
   }
 }
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterInterface.java Tue Apr 24 14:13:08 2007
@@ -15,7 +15,8 @@
  */
 package org.apache.hadoop.hbase;
 
-import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.VersionedProtocol;
 
 import java.io.IOException;
 
@@ -23,7 +24,7 @@
  * Clients interact with the HMasterInterface to gain access to meta-level HBase
  * functionality, like finding an HRegionServer and creating/destroying tables.
  ******************************************************************************/
-public interface HMasterInterface {
+public interface HMasterInterface extends VersionedProtocol {
   public static final long versionID = 1L; // initial version
 
   //////////////////////////////////////////////////////////////////////////////

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMasterRegionInterface.java Tue Apr 24 14:13:08 2007
@@ -15,13 +15,15 @@
  */
 package org.apache.hadoop.hbase;
 
-import java.io.*;
+import java.io.IOException;
+import org.apache.hadoop.ipc.VersionedProtocol;
+
 /*******************************************************************************
  * HRegionServers interact with the HMasterRegionInterface to report on local 
  * goings-on and to obtain data-handling instructions from the HMaster.
  *********************************************/
-public interface HMasterRegionInterface {
-  public static final long versionId = 1L;
+public interface HMasterRegionInterface extends VersionedProtocol {
+  public static final long versionID = 1L;
   public void regionServerStartup(HServerInfo info) throws IOException;
   public HMsg[] regionServerReport(HServerInfo info, HMsg msgs[]) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Tue Apr 24 14:13:08 2007
@@ -22,6 +22,8 @@
 
 import java.io.*;
 import java.util.*;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /*******************************************************************************
  * The HMemcache holds in-memory modifications to the HRegion.  This is really a
@@ -31,14 +33,14 @@
   private static final Log LOG = LogFactory.getLog(HMemcache.class);
   
   TreeMap<HStoreKey, BytesWritable> memcache 
-    = new TreeMap<HStoreKey, BytesWritable>();
+      = new TreeMap<HStoreKey, BytesWritable>();
   
   Vector<TreeMap<HStoreKey, BytesWritable>> history 
-    = new Vector<TreeMap<HStoreKey, BytesWritable>>();
+      = new Vector<TreeMap<HStoreKey, BytesWritable>>();
   
   TreeMap<HStoreKey, BytesWritable> snapshot = null;
 
-  HLocking locking = new HLocking();
+  ReadWriteLock locker = new ReentrantReadWriteLock();
 
   public HMemcache() {
   }
@@ -52,23 +54,27 @@
   }
   
   /**
-   * We want to return a snapshot of the current HMemcache with a known HLog 
+   * Returns a snapshot of the current HMemcache with a known HLog 
    * sequence number at the same time.
-   * 
-   * Return both the frozen HMemcache TreeMap, as well as the HLog seq number.
    *
-   * We need to prevent any writing to the cache during this time, so we obtain 
-   * a write lock for the duration of the operation.
+   * We need to prevent any writing to the cache during this time,
+   * so we obtain a write lock for the duration of the operation.
+   * 
+   * <p>If this method returns non-null, client must call
+   * {@link #deleteSnapshot()} to clear 'snapshot-in-progress'
+   * state when finished with the returned {@link Snapshot}.
+   * 
+   * @return frozen HMemcache TreeMap and HLog sequence number.
    */
   public Snapshot snapshotMemcacheForLog(HLog log) throws IOException {
     Snapshot retval = new Snapshot();
 
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
     try {
-      if (snapshot != null) {
+      if(snapshot != null) {
         throw new IOException("Snapshot in progress!");
       }
-      if (memcache.size() == 0) {
+      if(memcache.size() == 0) {
         LOG.debug("memcache empty. Skipping snapshot");
         return retval;
       }
@@ -86,7 +92,7 @@
       return retval;
       
     } finally {
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
@@ -96,19 +102,19 @@
    * Modifying the structure means we need to obtain a writelock.
    */
   public void deleteSnapshot() throws IOException {
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
 
     try {
-      if (snapshot == null) {
+      if(snapshot == null) {
         throw new IOException("Snapshot not present!");
       }
       LOG.debug("deleting snapshot");
       
       for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator(); 
-          it.hasNext();) {
+          it.hasNext(); ) {
         
         TreeMap<HStoreKey, BytesWritable> cur = it.next();
-        if (snapshot == cur) {
+        if(snapshot == cur) {
           it.remove();
           break;
         }
@@ -118,7 +124,7 @@
       LOG.debug("snapshot deleted");
       
     } finally {
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
@@ -128,9 +134,9 @@
    * Operation uses a write lock.
    */
   public void add(Text row, TreeMap<Text, byte[]> columns, long timestamp) {
-    locking.obtainWriteLock();
+    this.locker.writeLock().lock();
     try {
-      for(Iterator<Text> it = columns.keySet().iterator(); it.hasNext();) {
+      for(Iterator<Text> it = columns.keySet().iterator(); it.hasNext(); ) {
         Text column = it.next();
         byte[] val = columns.get(column);
 
@@ -139,7 +145,7 @@
       }
       
     } finally {
-      locking.releaseWriteLock();
+      this.locker.writeLock().unlock();
     }
   }
 
@@ -150,13 +156,13 @@
    */
   public byte[][] get(HStoreKey key, int numVersions) {
     Vector<byte[]> results = new Vector<byte[]>();
-    locking.obtainReadLock();
+    this.locker.readLock().lock();
     try {
       Vector<byte[]> result = get(memcache, key, numVersions-results.size());
       results.addAll(0, result);
 
       for(int i = history.size()-1; i >= 0; i--) {
-        if (numVersions > 0 && results.size() >= numVersions) {
+        if(numVersions > 0 && results.size() >= numVersions) {
           break;
         }
         
@@ -164,7 +170,7 @@
         results.addAll(results.size(), result);
       }
       
-      if (results.size() == 0) {
+      if(results.size() == 0) {
         return null;
         
       } else {
@@ -172,7 +178,7 @@
       }
       
     } finally {
-      locking.releaseReadLock();
+      this.locker.readLock().unlock();
     }
   }
 
@@ -184,7 +190,7 @@
    */
   public TreeMap<Text, byte[]> getFull(HStoreKey key) throws IOException {
     TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-    locking.obtainReadLock();
+    this.locker.readLock().lock();
     try {
       internalGetFull(memcache, key, results);
       for(int i = history.size()-1; i >= 0; i--) {
@@ -194,25 +200,25 @@
       return results;
       
     } finally {
-      locking.releaseReadLock();
+      this.locker.readLock().unlock();
     }
   }
   
   void internalGetFull(TreeMap<HStoreKey, BytesWritable> map, HStoreKey key, 
-                       TreeMap<Text, byte[]> results) {
+      TreeMap<Text, byte[]> results) {
     
     SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(key);
     
-    for(Iterator<HStoreKey> it = tailMap.keySet().iterator(); it.hasNext();) {
+    for(Iterator<HStoreKey> it = tailMap.keySet().iterator(); it.hasNext(); ) {
       HStoreKey itKey = it.next();
       Text itCol = itKey.getColumn();
 
-      if (results.get(itCol) == null
+      if(results.get(itCol) == null
           && key.matchesWithoutColumn(itKey)) {
         BytesWritable val = tailMap.get(itKey);
         results.put(itCol, val.get());
         
-      } else if (key.getRow().compareTo(itKey.getRow()) > 0) {
+      } else if(key.getRow().compareTo(itKey.getRow()) > 0) {
         break;
       }
     }
@@ -232,15 +238,15 @@
     HStoreKey curKey = new HStoreKey(key.getRow(), key.getColumn(), key.getTimestamp());
     SortedMap<HStoreKey, BytesWritable> tailMap = map.tailMap(curKey);
 
-    for(Iterator<HStoreKey> it = tailMap.keySet().iterator(); it.hasNext();) {
+    for(Iterator<HStoreKey> it = tailMap.keySet().iterator(); it.hasNext(); ) {
       HStoreKey itKey = it.next();
       
-      if (itKey.matchesRowCol(curKey)) {
+      if(itKey.matchesRowCol(curKey)) {
         result.add(tailMap.get(itKey).get());
         curKey.setVersion(itKey.getTimestamp() - 1);
       }
       
-      if (numVersions > 0 && result.size() >= numVersions) {
+      if(numVersions > 0 && result.size() >= numVersions) {
         break;
       }
     }
@@ -251,7 +257,7 @@
    * Return a scanner over the keys in the HMemcache
    */
   public HScannerInterface getScanner(long timestamp, Text targetCols[], Text firstRow)
-    throws IOException {
+      throws IOException {
     
     return new HMemcacheScanner(timestamp, targetCols, firstRow);
   }
@@ -267,16 +273,16 @@
 
     @SuppressWarnings("unchecked")
     public HMemcacheScanner(long timestamp, Text targetCols[], Text firstRow)
-      throws IOException {
+        throws IOException {
       
       super(timestamp, targetCols);
       
-      locking.obtainReadLock();
+      locker.readLock().lock();
       try {
         this.backingMaps = new TreeMap[history.size() + 1];
         int i = 0;
         for(Iterator<TreeMap<HStoreKey, BytesWritable>> it = history.iterator();
-            it.hasNext();) {
+            it.hasNext(); ) {
           
           backingMaps[i++] = it.next();
         }
@@ -290,7 +296,7 @@
 
         HStoreKey firstKey = new HStoreKey(firstRow);
         for(i = 0; i < backingMaps.length; i++) {
-          if (firstRow.getLength() != 0) {
+          if(firstRow.getLength() != 0) {
             keyIterators[i] = backingMaps[i].tailMap(firstKey).keySet().iterator();
             
           } else {
@@ -298,10 +304,10 @@
           }
           
           while(getNext(i)) {
-            if (!findFirstRow(i, firstRow)) {
+            if(! findFirstRow(i, firstRow)) {
               continue;
             }
-            if (columnMatch(i)) {
+            if(columnMatch(i)) {
               break;
             }
           }
@@ -331,7 +337,7 @@
      * @return - true if there is more data available
      */
     boolean getNext(int i) {
-      if (!keyIterators[i].hasNext()) {
+      if(! keyIterators[i].hasNext()) {
         closeSubScanner(i);
         return false;
       }
@@ -350,16 +356,16 @@
 
     /** Shut down map iterators, and release the lock */
     public void close() throws IOException {
-      if (!scannerClosed) {
+      if(! scannerClosed) {
         try {
           for(int i = 0; i < keys.length; i++) {
-            if (keyIterators[i] != null) {
+            if(keyIterators[i] != null) {
               closeSubScanner(i);
             }
           }
           
         } finally {
-          locking.releaseReadLock();
+          locker.readLock().unlock();
           scannerClosed = true;
         }
       }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java?view=diff&rev=532083&r1=532082&r2=532083
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMsg.java Tue Apr 24 14:13:08 2007
@@ -67,13 +67,13 @@
   // Writable
   //////////////////////////////////////////////////////////////////////////////
 
-  public void write(DataOutput out) throws IOException {
-    out.writeByte(msg);
-    info.write(out);
-  }
+   public void write(DataOutput out) throws IOException {
+     out.writeByte(msg);
+     info.write(out);
+   }
 
-  public void readFields(DataInput in) throws IOException {
-    this.msg = in.readByte();
-    this.info.readFields(in);
-  }
+   public void readFields(DataInput in) throws IOException {
+     this.msg = in.readByte();
+     this.info.readFields(in);
+   }
 }



Mime
View raw message