hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r540586 - in /lucene/hadoop/trunk/src/contrib/hbase: CHANGES.txt src/java/org/apache/hadoop/hbase/HMaster.java src/java/org/apache/hadoop/hbase/HRegionServer.java
Date Tue, 22 May 2007 13:51:11 GMT
Author: jimk
Date: Tue May 22 06:51:10 2007
New Revision: 540586

URL: http://svn.apache.org/viewvc?view=rev&rev=540586
Log:
HADOOP-1403. HBase reliability. Make master and region server more fault tolerant.

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?view=diff&rev=540586&r1=540585&r2=540586
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Tue May 22 06:51:10 2007
@@ -8,3 +8,5 @@
   3. HADOOP-1404. HBase command-line shutdown failing (Michael Stack)
   4. HADOOP-1397. Replace custom hbase locking with 
      java.util.concurrent.locks.ReentrantLock (Michael Stack)
+  5. HADOOP-1403. HBase reliability - make master and region server more fault
+     tolerant.

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?view=diff&rev=540586&r1=540585&r2=540586
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Tue
May 22 06:51:10 2007
@@ -55,6 +55,7 @@
   private FileSystem fs;
   private Random rand;
   private long threadWakeFrequency; 
+  private int numRetries;
   private long maxRegionOpenTime;
   
   // The 'msgQueue' is used to assign work to the client processor thread
@@ -181,7 +182,7 @@
             server.close(scannerId);
           }
         } catch (IOException e) {
-          e.printStackTrace();
+          LOG.error(e);
         }
         scannerId = -1L;
       }
@@ -284,7 +285,7 @@
           }
         }
       } catch(IOException e) {
-        e.printStackTrace();
+        LOG.error(e);
         closed = true;
       }
       LOG.debug("ROOT scanner exiting");
@@ -391,7 +392,7 @@
           } while(true);
 
         } catch(IOException e) {
-          e.printStackTrace();
+          LOG.error(e);
           closed = true;
         }
       }
@@ -480,6 +481,7 @@
 
     Path rootRegionDir =
       HStoreFile.getHRegionDir(dir, HGlobals.rootRegionInfo.regionName);
+    LOG.info("Root region dir: " + rootRegionDir.toString());
     if(! fs.exists(rootRegionDir)) {
       LOG.info("bootstrap: creating ROOT and first META regions");
       try {
@@ -492,11 +494,12 @@
         meta.close();
         
       } catch(IOException e) {
-        e.printStackTrace();
+        LOG.error(e);
       }
     }
 
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
+    this.numRetries =  conf.getInt("hbase.client.retries.number", 2);
     this.maxRegionOpenTime = conf.getLong("hbase.hbasemaster.maxregionopen", 30 * 1000);
     this.msgQueue = new Vector<PendingOperation>();
     this.serverLeases = new Leases(
@@ -575,7 +578,7 @@
     } catch(IOException e) {
       // Something happened during startup. Shut things down.
       this.closed = true;
-      e.printStackTrace();
+      LOG.error(e);
     }
 
     // Main processing loop
@@ -625,7 +628,7 @@
     } catch(Exception iex) {
       // Print if ever there is an interrupt (Just for kicks. Remove if it
       // ever happens).
-      iex.printStackTrace();
+      LOG.warn(iex);
     }
     try {
       // Join the thread till it finishes.
@@ -633,7 +636,7 @@
     } catch(Exception iex) {
       // Print if ever there is an interrupt (Just for kicks. Remove if it
       // ever happens).
-      iex.printStackTrace();
+      LOG.warn(iex);
     }
     try {
       // Join until its finished.  TODO: Maybe do in parallel in its own thread
@@ -642,7 +645,7 @@
     } catch(InterruptedException iex) {
       // Print if ever there is an interrupt (Just for kicks. Remove if it
       // ever happens).
-      iex.printStackTrace();
+      LOG.warn(iex);
     }
     
     if(LOG.isDebugEnabled()) {
@@ -1033,12 +1036,22 @@
 
       DataInputBuffer inbuf = new DataInputBuffer();
       try {
-        LabelledData[] values = null;
-
         while(true) {
+          LabelledData[] values = null;
+          
           HStoreKey key = new HStoreKey();
-          values = server.next(scannerId, key);
-          if(values.length == 0) {
+          try {
+            values = server.next(scannerId, key);
+            
+          } catch(NotServingRegionException e) {
+            throw e;
+            
+          } catch(IOException e) {
+            LOG.error(e);
+            break;
+          }
+          
+          if(values == null || values.length == 0) {
             break;
           }
 
@@ -1053,7 +1066,13 @@
             // No server
             continue;
           }
-          serverName = new String(bytes, UTF8_ENCODING);
+          try {
+            serverName = new String(bytes, UTF8_ENCODING);
+            
+          } catch(UnsupportedEncodingException e) {
+            LOG.error(e);
+            break;
+          }
 
           if(deadServer.compareTo(serverName) != 0) {
             // This isn't the server you're looking for - move along
@@ -1065,7 +1084,15 @@
             // No start code
             continue;
           }
-          long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
+          long startCode = -1L;
+          
+          try {
+            startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
+
+          } catch(UnsupportedEncodingException e) {
+            LOG.error(e);
+            break;
+          }
 
           if(oldStartCode != startCode) {
             // Close but no cigar
@@ -1080,7 +1107,14 @@
           }
           inbuf.reset(bytes, bytes.length);
           HRegionInfo info = new HRegionInfo();
-          info.readFields(inbuf);
+          
+          try {
+            info.readFields(inbuf);
+            
+          } catch(IOException e) {
+            LOG.error(e);
+            break;
+          }
 
           if(LOG.isDebugEnabled()) {
             LOG.debug(serverName + " was serving " + info.regionName);
@@ -1098,7 +1132,7 @@
             server.close(scannerId);
             
           } catch(IOException e) {
-            e.printStackTrace();
+            LOG.error(e);
             
           }
         }
@@ -1134,27 +1168,53 @@
       }
 
       // Scan the ROOT region
-      
-      waitForRootRegion();      // Wait until the root region is available
-      HRegionInterface server = client.getHRegionConnection(rootRegionLocation);
-      long scannerId = 
-        server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow);
-      
-      scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName);
+
+      HRegionInterface server = null;
+      long scannerId = -1L;
+      for(int tries = 0; tries < numRetries; tries ++) {
+        waitForRootRegion();      // Wait until the root region is available
+        server = client.getHRegionConnection(rootRegionLocation);
+        scannerId = -1L;
+        
+        try {
+          scannerId = server.openScanner(HGlobals.rootRegionInfo.regionName, columns, startRow);
+          scanMetaRegion(server, scannerId, HGlobals.rootRegionInfo.regionName);
+          break;
+          
+        } catch(NotServingRegionException e) {
+          if(tries == numRetries - 1) {
+            throw e;
+          }
+        }
+      }
 
       // We can not scan every meta region if they have not already been assigned
       // and scanned.
 
-      metaScanner.waitForMetaScan();
+      for(int tries = 0; tries < numRetries; tries ++) {
+        try {
+          metaScanner.waitForMetaScan();
       
-      for(Iterator<MetaRegion> i = knownMetaRegions.values().iterator();
-          i.hasNext(); ) {
-        
-        MetaRegion r = i.next();
+          for(Iterator<MetaRegion> i = knownMetaRegions.values().iterator();
+              i.hasNext(); ) {
+          
+            server = null;
+            scannerId = -1L;
+            MetaRegion r = i.next();
 
-        server = client.getHRegionConnection(r.server);
-        scannerId = server.openScanner(r.regionName, columns, startRow);
-        scanMetaRegion(server, scannerId, r.regionName);
+            server = client.getHRegionConnection(r.server);
+          
+            scannerId = server.openScanner(r.regionName, columns, startRow);
+            scanMetaRegion(server, scannerId, r.regionName);
+            
+          }
+          break;
+            
+        } catch(NotServingRegionException e) {
+          if(tries == numRetries - 1) {
+            throw e;
+          }
+        }
       }
     }
   }
@@ -1183,42 +1243,54 @@
     }
     
     public void process() throws IOException {
-      
-      // We can not access any meta region if they have not already been assigned
-      // and scanned.
+      for(int tries = 0; tries < numRetries; tries ++) {
 
-      metaScanner.waitForMetaScan();
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("region closed: " + regionInfo.regionName);
-      }
+        // We can not access any meta region if they have not already been assigned
+        // and scanned.
 
-      // Mark the Region as unavailable in the appropriate meta table
+        metaScanner.waitForMetaScan();
+
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("region closed: " + regionInfo.regionName);
+        }
+
+        // Mark the Region as unavailable in the appropriate meta table
+
+        Text metaRegionName;
+        HRegionInterface server;
+        if (rootRegion) {
+          metaRegionName = HGlobals.rootRegionInfo.regionName;
+          waitForRootRegion();            // Make sure root region available
+          server = client.getHRegionConnection(rootRegionLocation);
 
-      Text metaRegionName;
-      HRegionInterface server;
-      if (rootRegion) {
-        metaRegionName = HGlobals.rootRegionInfo.regionName;
-        waitForRootRegion();            // Make sure root region available
-        server = client.getHRegionConnection(rootRegionLocation);
-        
-      } else {
-        MetaRegion r = null;
-        if(knownMetaRegions.containsKey(regionInfo.regionName)) {
-          r = knownMetaRegions.get(regionInfo.regionName);
-          
         } else {
-          r = knownMetaRegions.get(
-              knownMetaRegions.headMap(regionInfo.regionName).lastKey());
+          MetaRegion r = null;
+          if(knownMetaRegions.containsKey(regionInfo.regionName)) {
+            r = knownMetaRegions.get(regionInfo.regionName);
+
+          } else {
+            r = knownMetaRegions.get(
+                knownMetaRegions.headMap(regionInfo.regionName).lastKey());
+          }
+          metaRegionName = r.regionName;
+          server = client.getHRegionConnection(r.server);
+        }
+
+        try {
+          long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName);
+          server.delete(metaRegionName, clientId, lockid, COL_SERVER);
+          server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
+          server.commit(metaRegionName, clientId, lockid);
+          break;
+
+        } catch(NotServingRegionException e) {
+          if(tries == numRetries - 1) {
+            throw e;
+          }
+          continue;
         }
-        metaRegionName = r.regionName;
-        server = client.getHRegionConnection(r.server);
       }
-      long lockid = server.startUpdate(metaRegionName, clientId, regionInfo.regionName);
-      server.delete(metaRegionName, clientId, lockid, COL_SERVER);
-      server.delete(metaRegionName, clientId, lockid, COL_STARTCODE);
-      server.commit(metaRegionName, clientId, lockid);
-      
+
       if(reassignRegion) {
         if(LOG.isDebugEnabled()) {
           LOG.debug("reassign region: " + regionInfo.regionName);
@@ -1261,51 +1333,61 @@
             String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
         
       } catch(UnsupportedEncodingException e) {
-        e.printStackTrace();
+        LOG.error(e);
       }
 
     }
     
     public void process() throws IOException {
+      for(int tries = 0; tries < numRetries; tries ++) {
 
-      // We can not access any meta region if they have not already been assigned
-      // and scanned.
+        // We can not access any meta region if they have not already been assigned
+        // and scanned.
 
-      metaScanner.waitForMetaScan();
-      
-      if(LOG.isDebugEnabled()) {
-        LOG.debug(regionName + " open on "
-            + new String(serverAddress.get(), UTF8_ENCODING));
-      }
+        metaScanner.waitForMetaScan();
 
-      // Register the newly-available Region's location.
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(regionName + " open on "
+              + new String(serverAddress.get(), UTF8_ENCODING));
+        }
+
+        // Register the newly-available Region's location.
+
+        Text metaRegionName;
+        HRegionInterface server;
+        if(rootRegion) {
+          metaRegionName = HGlobals.rootRegionInfo.regionName;
+          waitForRootRegion();            // Make sure root region available
+          server = client.getHRegionConnection(rootRegionLocation);
 
-      Text metaRegionName;
-      HRegionInterface server;
-      if(rootRegion) {
-        metaRegionName = HGlobals.rootRegionInfo.regionName;
-        waitForRootRegion();            // Make sure root region available
-        server = client.getHRegionConnection(rootRegionLocation);
-        
-      } else {
-        MetaRegion r = null;
-        if(knownMetaRegions.containsKey(regionName)) {
-          r = knownMetaRegions.get(regionName);
-          
         } else {
-          r = knownMetaRegions.get(
-              knownMetaRegions.headMap(regionName).lastKey());
+          MetaRegion r = null;
+          if(knownMetaRegions.containsKey(regionName)) {
+            r = knownMetaRegions.get(regionName);
+
+          } else {
+            r = knownMetaRegions.get(
+                knownMetaRegions.headMap(regionName).lastKey());
+          }
+          metaRegionName = r.regionName;
+          server = client.getHRegionConnection(r.server);
+        }
+        if(LOG.isDebugEnabled()) {
+          LOG.debug("updating row " + regionName + " in table " + metaRegionName);
+        }
+        try {
+          long lockid = server.startUpdate(metaRegionName, clientId, regionName);
+          server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress);
+          server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
+          server.commit(metaRegionName, clientId, lockid);
+          break;
+          
+        } catch(NotServingRegionException e) {
+          if(tries == numRetries - 1) {
+            throw e;
+          }
         }
-        metaRegionName = r.regionName;
-        server = client.getHRegionConnection(r.server);
-      }
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("updating row " + regionName + " in table " + metaRegionName);
       }
-      long lockid = server.startUpdate(metaRegionName, clientId, regionName);
-      server.put(metaRegionName, clientId, lockid, COL_SERVER, serverAddress);
-      server.put(metaRegionName, clientId, lockid, COL_STARTCODE, startCode);
-      server.commit(metaRegionName, clientId, lockid);
     }
   }
 
@@ -1323,65 +1405,75 @@
       throw new IllegalStateException(MASTER_NOT_RUNNING);
     }
     HRegionInfo newRegion = new HRegionInfo(rand.nextLong(), desc, null, null);
-    
-    // We can not access any meta region if they have not already been assigned
-    // and scanned.
 
-    metaScanner.waitForMetaScan();
-    
-    // 1. Check to see if table already exists
+    for(int tries = 0; tries < numRetries; tries++) {
+      try {
+        // We can not access any meta region if they have not already been assigned
+        // and scanned.
 
-    MetaRegion m = null;
-    if(knownMetaRegions.containsKey(newRegion.regionName)) {
-      m = knownMetaRegions.get(newRegion.regionName);
-      
-    } else {
-      m = knownMetaRegions.get(
-          knownMetaRegions.headMap(newRegion.regionName).lastKey());
-    }
-    Text metaRegionName = m.regionName;
-    HRegionInterface server = client.getHRegionConnection(m.server);
+        metaScanner.waitForMetaScan();
 
+        // 1. Check to see if table already exists
 
-    BytesWritable bytes = server.get(metaRegionName, desc.getName(), COL_REGIONINFO);
-    if(bytes != null && bytes.getSize() != 0) {
-      byte[] infoBytes = bytes.get();
-      DataInputBuffer inbuf = new DataInputBuffer();
-      inbuf.reset(infoBytes, infoBytes.length);
-      HRegionInfo info = new HRegionInfo();
-      info.readFields(inbuf);
-      if(info.tableDesc.getName().compareTo(desc.getName()) == 0) {
-        throw new IOException("table already exists");
+        MetaRegion m = null;
+        if(knownMetaRegions.containsKey(newRegion.regionName)) {
+          m = knownMetaRegions.get(newRegion.regionName);
+
+        } else {
+          m = knownMetaRegions.get(
+              knownMetaRegions.headMap(newRegion.regionName).lastKey());
+        }
+        Text metaRegionName = m.regionName;
+        HRegionInterface server = client.getHRegionConnection(m.server);
+
+
+        BytesWritable bytes = server.get(metaRegionName, desc.getName(), COL_REGIONINFO);
+        if(bytes != null && bytes.getSize() != 0) {
+          byte[] infoBytes = bytes.get();
+          DataInputBuffer inbuf = new DataInputBuffer();
+          inbuf.reset(infoBytes, infoBytes.length);
+          HRegionInfo info = new HRegionInfo();
+          info.readFields(inbuf);
+          if(info.tableDesc.getName().compareTo(desc.getName()) == 0) {
+            throw new IOException("table already exists");
+          }
+        }
+
+        // 2. Create the HRegion
+
+        HRegion r = createNewHRegion(desc, newRegion.regionId);
+
+        // 3. Insert into meta
+
+        HRegionInfo info = r.getRegionInfo();
+        Text regionName = r.getRegionName();
+        ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
+        DataOutputStream s = new DataOutputStream(byteValue);
+        info.write(s);
+
+        long clientId = rand.nextLong();
+        long lockid = server.startUpdate(metaRegionName, clientId, regionName);
+        server.put(metaRegionName, clientId, lockid, COL_REGIONINFO, 
+            new BytesWritable(byteValue.toByteArray()));
+        server.commit(metaRegionName, clientId, lockid);
+
+        // 4. Close the new region to flush it to disk
+
+        r.close();
+
+        // 5. Get it assigned to a server
+
+        unassignedRegions.put(regionName, info);
+        assignAttempts.put(regionName, 0L);
+        break;
+
+      } catch(NotServingRegionException e) {
+        if(tries == numRetries - 1) {
+          throw e;
+        }
       }
     }
     
-    // 2. Create the HRegion
-    
-    HRegion r = createNewHRegion(desc, newRegion.regionId);
-    
-    // 3. Insert into meta
-    
-    HRegionInfo info = r.getRegionInfo();
-    Text regionName = r.getRegionName();
-    ByteArrayOutputStream byteValue = new ByteArrayOutputStream();
-    DataOutputStream s = new DataOutputStream(byteValue);
-    info.write(s);
-
-    long clientId = rand.nextLong();
-    long lockid = server.startUpdate(metaRegionName, clientId, regionName);
-    server.put(metaRegionName, clientId, lockid, COL_REGIONINFO, 
-        new BytesWritable(byteValue.toByteArray()));
-    server.commit(metaRegionName, clientId, lockid);
-    
-    // 4. Close the new region to flush it to disk
-    
-    r.close();
-    
-    // 5. Get it assigned to a server
-    
-    unassignedRegions.put(regionName, info);
-    assignAttempts.put(regionName, 0L);
-    
     if(LOG.isDebugEnabled()) {
       LOG.debug("created table " + desc.getName());
     }
@@ -1438,186 +1530,198 @@
     if (!isMasterRunning()) {
       throw new IllegalStateException(MASTER_NOT_RUNNING);
     }
-    
-    // We can not access any meta region if they have not already been assigned
-    // and scanned.
 
-    metaScanner.waitForMetaScan();
-    
-    Text firstMetaRegion = null;
-    if(knownMetaRegions.size() == 1) {
-      firstMetaRegion = knownMetaRegions.firstKey();
-      
-    } else if(knownMetaRegions.containsKey(tableName)) {
-      firstMetaRegion = tableName;
-      
-    } else {
-      firstMetaRegion = knownMetaRegions.headMap(tableName).lastKey();
-    }
+    for(int tries = 0; tries < numRetries; tries++) {
+      try {
+        // We can not access any meta region if they have not already been
+        // assigned and scanned.
 
-    synchronized(metaScannerLock) {     // Prevent meta scanner from running
-      for(Iterator<MetaRegion> it =
-          knownMetaRegions.tailMap(firstMetaRegion).values().iterator();
-          it.hasNext(); ) {
+        metaScanner.waitForMetaScan();
 
-        // Find all the regions that make up this table
+        Text firstMetaRegion = null;
+        if(knownMetaRegions.size() == 1) {
+          firstMetaRegion = knownMetaRegions.firstKey();
 
-        MetaRegion m = it.next();
-        HRegionInterface server = client.getHRegionConnection(m.server);
+        } else if(knownMetaRegions.containsKey(tableName)) {
+          firstMetaRegion = tableName;
 
-        // Rows in the meta table we will need to delete
+        } else {
+          firstMetaRegion = knownMetaRegions.headMap(tableName).lastKey();
+        }
 
-        Vector<Text> rowsToDelete = new Vector<Text>();
+        synchronized(metaScannerLock) {     // Prevent meta scanner from running
+          for(Iterator<MetaRegion> it =
+              knownMetaRegions.tailMap(firstMetaRegion).values().iterator();
+              it.hasNext(); ) {
 
-        // Regions that are being served. We will get the HRegionServers
-        // to delete them for us, but we don't tell them that until after
-        // we are done scanning to prevent lock contention
+            // Find all the regions that make up this table
 
-        TreeMap<String, TreeMap<Text, HRegionInfo>> localKillList =
-          new TreeMap<String, TreeMap<Text, HRegionInfo>>();
+            MetaRegion m = it.next();
+            HRegionInterface server = client.getHRegionConnection(m.server);
 
-        // Regions that are not being served. We will have to delete
-        // them ourselves
+            // Rows in the meta table we will need to delete
 
-        TreeSet<Text> unservedRegions = new TreeSet<Text>();
+            Vector<Text> rowsToDelete = new Vector<Text>();
 
-        long scannerId = -1L;
-        try {
-          scannerId = server.openScanner(m.regionName, METACOLUMNS, tableName);
+            // Regions that are being served. We will get the HRegionServers
+            // to delete them for us, but we don't tell them that until after
+            // we are done scanning to prevent lock contention
 
+            TreeMap<String, TreeMap<Text, HRegionInfo>> localKillList =
+              new TreeMap<String, TreeMap<Text, HRegionInfo>>();
 
-          DataInputBuffer inbuf = new DataInputBuffer();
-          byte[] bytes;
-          while(true) {
-            LabelledData[] values = null;
-            HStoreKey key = new HStoreKey();
-            values = server.next(scannerId, key);
-            if(values == null || values.length == 0) {
-              break;
-            }
-            TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
-            for(int i = 0; i < values.length; i++) {
-              bytes = new byte[values[i].getData().getSize()];
-              System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
-              results.put(values[i].getLabel(), bytes);
-            }
-            bytes = results.get(COL_REGIONINFO);
-            if(bytes == null || bytes.length == 0) {
-              break;
-            }
-            inbuf.reset(bytes, bytes.length);
-            HRegionInfo info = new HRegionInfo();
-            info.readFields(inbuf);
+            // Regions that are not being served. We will have to delete
+            // them ourselves
 
-            if(info.tableDesc.getName().compareTo(tableName) > 0) {
-              break;                      // Beyond any more entries for this table
-            }
+            TreeSet<Text> unservedRegions = new TreeSet<Text>();
 
-            rowsToDelete.add(info.regionName);
+            long scannerId = -1L;
+            try {
+              scannerId = server.openScanner(m.regionName, METACOLUMNS, tableName);
+
+              DataInputBuffer inbuf = new DataInputBuffer();
+              byte[] bytes;
+              while(true) {
+                LabelledData[] values = null;
+                HStoreKey key = new HStoreKey();
+                values = server.next(scannerId, key);
+                if(values == null || values.length == 0) {
+                  break;
+                }
+                TreeMap<Text, byte[]> results = new TreeMap<Text, byte[]>();
+                for(int i = 0; i < values.length; i++) {
+                  bytes = new byte[values[i].getData().getSize()];
+                  System.arraycopy(values[i].getData().get(), 0, bytes, 0, bytes.length);
+                  results.put(values[i].getLabel(), bytes);
+                }
+                bytes = results.get(COL_REGIONINFO);
+                if(bytes == null || bytes.length == 0) {
+                  break;
+                }
+                inbuf.reset(bytes, bytes.length);
+                HRegionInfo info = new HRegionInfo();
+                info.readFields(inbuf);
 
-            // Is it being served?
+                if(info.tableDesc.getName().compareTo(tableName) > 0) {
+                  break;               // Beyond any more entries for this table
+                }
 
-            bytes = results.get(COL_SERVER);
-            if(bytes != null && bytes.length != 0) {
-              String serverName = new String(bytes, UTF8_ENCODING);
+                rowsToDelete.add(info.regionName);
 
-              bytes = results.get(COL_STARTCODE);
-              if(bytes != null && bytes.length != 0) {
-                long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
+                // Is it being served?
 
-                HServerInfo s = serversToServerInfo.get(serverName);
-                if(s != null && s.getStartCode() == startCode) {
+                bytes = results.get(COL_SERVER);
+                if(bytes != null && bytes.length != 0) {
+                  String serverName = new String(bytes, UTF8_ENCODING);
+
+                  bytes = results.get(COL_STARTCODE);
+                  if(bytes != null && bytes.length != 0) {
+                    long startCode = Long.valueOf(new String(bytes, UTF8_ENCODING));
+
+                    HServerInfo s = serversToServerInfo.get(serverName);
+                    if(s != null && s.getStartCode() == startCode) {
+
+                      // It is being served.
+                      // Tell the server to stop it and not report back.
+
+                      TreeMap<Text, HRegionInfo> regionsToKill =
+                        localKillList.get(serverName);
+
+                      if(regionsToKill == null) {
+                        regionsToKill = new TreeMap<Text, HRegionInfo>();
+                      }
+                      regionsToKill.put(info.regionName, info);
+                      localKillList.put(serverName, regionsToKill);
+                      continue;
+                    }
+                  }
+                }
 
-                  // It is being served.
-                  // Tell the server to stop it and not report back.
+                // Region is not currently being served.
+                // Prevent it from getting assigned and add it to the list of
+                // regions we need to delete here.
+
+                unassignedRegions.remove(info.regionName);
+                assignAttempts.remove(info.regionName);
+                unservedRegions.add(info.regionName);
+              }
 
-                  TreeMap<Text, HRegionInfo> regionsToKill =
-                    localKillList.get(serverName);
+            } finally {
+              if(scannerId != -1L) {
+                try {
+                  server.close(scannerId);
 
-                  if(regionsToKill == null) {
-                    regionsToKill = new TreeMap<Text, HRegionInfo>();
-                  }
-                  regionsToKill.put(info.regionName, info);
-                  localKillList.put(serverName, regionsToKill);
-                  continue;
+                } catch(IOException e) {
+                  LOG.error(e);
                 }
               }
+              scannerId = -1L;
             }
-            
-            // Region is not currently being served.
-            // Prevent it from getting assigned and add it to the list of
-            // regions we need to delete here.
-            
-            unassignedRegions.remove(info.regionName);
-            assignAttempts.remove(info.regionName);
-            unservedRegions.add(info.regionName);
-          }
-          
-        } catch(IOException e) {
-          e.printStackTrace();
 
-        } finally {
-          if(scannerId != -1L) {
-            try {
-              server.close(scannerId);
+            // Wipe the existence of the regions out of the meta table
 
-            } catch(IOException e) {
-              e.printStackTrace();
+            for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
+              Text rowName = row.next();
+              if(LOG.isDebugEnabled()) {
+                LOG.debug("deleting columns in row: " + rowName);
+              }
+              long lockid = -1L;
+              long clientId = rand.nextLong();
+              try {
+                lockid = server.startUpdate(m.regionName, clientId, rowName);
+                server.delete(m.regionName, clientId, lockid, COL_REGIONINFO);
+                server.delete(m.regionName, clientId, lockid, COL_SERVER);
+                server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
+                server.commit(m.regionName, clientId, lockid);
+                lockid = -1L;
+                if(LOG.isDebugEnabled()) {
+                  LOG.debug("deleted columns in row: " + rowName);
+                }
 
+              } catch(IOException e) {
+                if(lockid != -1L) {
+                  server.abort(m.regionName, clientId, lockid);
+                }
+                LOG.error("columns deletion failed in row: " + rowName);
+                LOG.error(e);
+                throw e;
+              }
             }
-          }
-          scannerId = -1L;
-        }
 
-        // Wipe the existence of the regions out of the meta table
-        
-        for(Iterator<Text> row = rowsToDelete.iterator(); row.hasNext(); ) {
-          Text rowName = row.next();
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("deleting columns in row: " + rowName);
-          }
-          long lockid = -1L;
-          long clientId = rand.nextLong();
-          try {
-            lockid = server.startUpdate(m.regionName, clientId, rowName);
-            server.delete(m.regionName, clientId, lockid, COL_REGIONINFO);
-            server.delete(m.regionName, clientId, lockid, COL_SERVER);
-            server.delete(m.regionName, clientId, lockid, COL_STARTCODE);
-            server.commit(m.regionName, clientId, lockid);
-            lockid = -1L;
-            if(LOG.isDebugEnabled()) {
-              LOG.debug("deleted columns in row: " + rowName);
+            // Notify region servers that some regions need to be closed and deleted
+
+            if(localKillList.size() != 0) {
+              killList.putAll(localKillList);
             }
 
-          } catch(Exception e) {
-            if(lockid != -1L) {
-              server.abort(m.regionName, clientId, lockid);
+            // Delete any regions that are not being served
+
+            for(Iterator<Text> i = unservedRegions.iterator(); i.hasNext(); ) {
+              Text regionName = i.next();
+              try {
+                HRegion.deleteRegion(fs, dir, regionName);
+
+              } catch(IOException e) {
+                LOG.error("failed to delete region " + regionName);
+                LOG.error(e);
+                throw e;
+              }
             }
-            LOG.error("columns deletion failed in row: " + rowName);
-            LOG.error(e);
           }
         }
-        
-        // Notify region servers that some regions need to be closed and deleted
-        
-        if(localKillList.size() != 0) {
-          killList.putAll(localKillList);
+      } catch(NotServingRegionException e) {
+        if(tries == numRetries - 1) {
+          throw e;
         }
+        continue;
 
-        // Delete any regions that are not being served
-        
-        for(Iterator<Text> i = unservedRegions.iterator(); i.hasNext(); ) {
-          Text regionName = i.next();
-          try {
-            HRegion.deleteRegion(fs, dir, regionName);
-            
-          } catch(IOException e) {
-            LOG.error("failed to delete region " + regionName);
-            LOG.error(e);
-          }
-        }
+      } catch(IOException e) {
+        LOG.error(e);
+        throw e;
       }
+      break;
     }
+
     if(LOG.isDebugEnabled()) {
       LOG.debug("deleted table: " + tableName);
     }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?view=diff&rev=540586&r1=540585&r2=540586
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
Tue May 22 06:51:10 2007
@@ -57,6 +57,7 @@
   private long threadWakeFrequency;
   private int maxLogEntries;
   private long msgInterval;
+  private int numRetries;
   
   // Check to see if regions should be split
   
@@ -102,15 +103,15 @@
           try {
             for(Iterator<HRegion>it = regionsToCheck.iterator(); it.hasNext(); ) {
               HRegion cur = it.next();
-              
+
               if(cur.isClosed()) {
                 continue;                               // Skip if closed
               }
-              
+
               if(cur.needsCompaction()) {
-                
+
                 // The best time to split a region is right after it has been compacted
-                
+
                 if(cur.compactStores()) {
                   Text midKey = new Text();
                   if(cur.needsSplit(midKey)) {
@@ -132,47 +133,58 @@
                       (oldRegion.find(META_TABLE_NAME.toString()) == 0) ?
                           ROOT_TABLE_NAME : META_TABLE_NAME;
 
-                    client.openTable(tableToUpdate);
-                    long lockid = client.startUpdate(oldRegion);
-                    client.delete(lockid, COL_REGIONINFO);
-                    client.delete(lockid, COL_SERVER);
-                    client.delete(lockid, COL_STARTCODE);
-                    client.commit(lockid);
-
-                    for(int i = 0; i < newRegions.length; i++) {
-                      ByteArrayOutputStream bytes = new ByteArrayOutputStream();
-                      DataOutputStream out = new DataOutputStream(bytes);
-                      newRegions[i].getRegionInfo().write(out);
-
-                      lockid = client.startUpdate(newRegions[i].getRegionName());
-                      client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
-                      client.put(lockid, COL_SERVER, 
-                          info.getServerAddress().toString().getBytes(UTF8_ENCODING));
-                      client.put(lockid, COL_STARTCODE, 
-                          String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
-                      client.commit(lockid);
-                    }
-                    
-                    // Now tell the master about the new regions
-
-                    if(LOG.isDebugEnabled()) {
-                      LOG.debug("reporting region split to master");
-                    }
-
-                    reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
-
-                    LOG.info("region split successful. old region=" + oldRegion
-                        + ", new regions: " + newRegions[0].getRegionName() + ", "
-                        + newRegions[1].getRegionName());
-
-                    // Finally, start serving the new regions
-                    
-                    lock.writeLock().lock();
-                    try {
-                      regions.put(newRegions[0].getRegionName(), newRegions[0]);
-                      regions.put(newRegions[1].getRegionName(), newRegions[1]);
-                    } finally {
-                      lock.writeLock().unlock();
+                    for(int tries = 0; tries < numRetries; tries++) {
+                      try {
+                        client.openTable(tableToUpdate);
+                        long lockid = client.startUpdate(oldRegion);
+                        client.delete(lockid, COL_REGIONINFO);
+                        client.delete(lockid, COL_SERVER);
+                        client.delete(lockid, COL_STARTCODE);
+                        client.commit(lockid);
+
+                        for(int i = 0; i < newRegions.length; i++) {
+                          ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+                          DataOutputStream out = new DataOutputStream(bytes);
+                          newRegions[i].getRegionInfo().write(out);
+
+                          lockid = client.startUpdate(newRegions[i].getRegionName());
+                          client.put(lockid, COL_REGIONINFO, bytes.toByteArray());
+                          client.put(lockid, COL_SERVER, 
+                              info.getServerAddress().toString().getBytes(UTF8_ENCODING));
+                          client.put(lockid, COL_STARTCODE, 
+                              String.valueOf(info.getStartCode()).getBytes(UTF8_ENCODING));
+                          client.commit(lockid);
+                        }
+
+                        // Now tell the master about the new regions
+
+                        if(LOG.isDebugEnabled()) {
+                          LOG.debug("reporting region split to master");
+                        }
+
+                        reportSplit(newRegions[0].getRegionInfo(), newRegions[1].getRegionInfo());
+
+                        LOG.info("region split successful. old region=" + oldRegion
+                            + ", new regions: " + newRegions[0].getRegionName() + ", "
+                            + newRegions[1].getRegionName());
+
+                        // Finally, start serving the new regions
+
+                        lock.writeLock().lock();
+                        try {
+                          regions.put(newRegions[0].getRegionName(), newRegions[0]);
+                          regions.put(newRegions[1].getRegionName(), newRegions[1]);
+                        } finally {
+                          lock.writeLock().unlock();
+                        }
+
+                      } catch(NotServingRegionException e) {
+                        if(tries == numRetries - 1) {
+                          throw e;
+                        }
+                        continue;
+                      }
+                      break;
                     }
                   }
                 }
@@ -183,7 +195,7 @@
             LOG.error(e);
           }
         }
-        
+
         // Sleep
         long waitTime = stopRequested ? 0
             : splitOrCompactCheckFrequency - (System.currentTimeMillis() - startTime);
@@ -241,7 +253,7 @@
               cur.optionallyFlush();
 
             } catch(IOException iex) {
-              iex.printStackTrace();
+              LOG.error(iex);
             }
           }
         }
@@ -503,9 +515,6 @@
             } catch(InterruptedException iex) {
             }
           }
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Wake");
-          }
         }
         continue;
       }
@@ -617,7 +626,7 @@
       join();
       
     } catch(IOException e) {
-      e.printStackTrace();
+      LOG.error(e);
     }
     if(LOG.isDebugEnabled()) {
       LOG.debug("main thread exiting");
@@ -747,7 +756,7 @@
             throw new IOException("Impossible state during msg processing.  Instruction:
" + msg);
           }
         } catch(IOException e) {
-          e.printStackTrace();
+          LOG.error(e);
         }
       }
       if(LOG.isDebugEnabled()) {
@@ -944,7 +953,7 @@
         localRegion.abort(localLockId);
         
       } catch(IOException iex) {
-        iex.printStackTrace();
+        LOG.error(iex);
       }
     }
   }
@@ -1071,7 +1080,7 @@
       leases.createLease(scannerName, scannerName, new ScannerListener(scannerName));
     
     } catch(IOException e) {
-      e.printStackTrace();
+      LOG.error(e);
       throw e;
     }
     return scannerId;



Mime
View raw message