hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject svn commit: r573777 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/
Date Sat, 08 Sep 2007 03:09:43 GMT
Author: jimk
Date: Fri Sep  7 20:09:42 2007
New Revision: 573777

URL: http://svn.apache.org/viewvc?rev=573777&view=rev
Log:
HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully

Added:
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
    lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
    lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Sep  7 20:09:42 2007
@@ -27,6 +27,7 @@
     HADOOP-1785 TableInputFormat.TableRecordReader.next has a bug
                 (Ning Li via Stack)
     HADOOP-1800 output should default utf8 encoding
+    HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully
     HADOOP-1814	TestCleanRegionServerExit fails too often on Hudson
     HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8")
     HADOOP-1832 listTables() returns duplicate tables

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Fri
Sep  7 20:09:42 2007
@@ -56,6 +56,7 @@
 
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
 
 
@@ -102,7 +103,8 @@
 
   long metaRescanInterval;
 
-  final AtomicReference<HServerAddress> rootRegionLocation;
+  final AtomicReference<HServerAddress> rootRegionLocation =
+    new AtomicReference<HServerAddress>();
   
   Lock splitLogLock = new ReentrantLock();
 
@@ -359,8 +361,10 @@
       for (Text family: split.getTableDesc().families().keySet()) {
         Path p = HStoreFile.getMapDir(fs.makeQualified(dir),
             split.getRegionName(), HStoreKey.extractFamily(family));
+
         // Look for reference files.  Call listPaths with an anonymous
         // instance of PathFilter.
+
         Path [] ps = fs.listPaths(p,
             new PathFilter () {
               public boolean accept(Path path) {
@@ -368,7 +372,7 @@
               }
             }
         );
-        
+
         if (ps != null && ps.length > 0) {
           result = true;
           break;
@@ -393,7 +397,7 @@
     }
 
     protected void checkAssigned(final HRegionInfo info,
-        final String serverName, final long startCode) {
+        final String serverName, final long startCode) throws IOException {
       
       // Skip region - if ...
       if(info.offLine                                     // offline
@@ -445,6 +449,7 @@
             
           } catch (IOException e) {
             LOG.warn("unable to split region server log because: ", e);
+            throw e;
           }
         }
         
@@ -512,6 +517,14 @@
           // at least log it rather than go out silently.
           LOG.error("Unexpected exception", e);
         }
+        
+        // We ran out of tries. Make sure the file system is still available
+        
+        if (!FSUtils.isFileSystemAvailable(fs)) {
+          LOG.fatal("Shutting down hbase cluster: file system not available");
+          closed = true;
+        }
+        
         if (!closed) {
           // sleep before retry
 
@@ -675,6 +688,13 @@
           LOG.error("Unexpected exception", e);
         }
         
+        // We ran out of tries. Make sure the file system is still available
+        
+        if (!FSUtils.isFileSystemAvailable(fs)) {
+          LOG.fatal("Shutting down hbase cluster: file system not available");
+          closed = true;
+        }
+        
         if (!closed) {
           // sleep before retry
           try {
@@ -829,46 +849,56 @@
    * @throws IOException
    */
   public HMaster(Path dir, HServerAddress address, Configuration conf)
-  throws IOException {
+    throws IOException {
+    
     this.closed = true;
     this.dir = dir;
     this.conf = conf;
     this.fs = FileSystem.get(conf);
     this.rand = new Random();
-
-    // Make sure the root directory exists!
-    if(! fs.exists(dir)) {
-      fs.mkdirs(dir);
-    }
-
+    
     Path rootRegionDir =
       HRegion.getRegionDir(dir, HGlobals.rootRegionInfo.regionName);
     LOG.info("Root region dir: " + rootRegionDir.toString());
 
-    if (!fs.exists(rootRegionDir)) {
-      LOG.info("bootstrap: creating ROOT and first META regions");
-      try {
-        HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir,
-            this.conf, null);
-        
-        HRegion meta =
-          HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc,
-              null, null), this.dir, this.conf, null);
-        
-        // Add first region from the META table to the ROOT region.
-        
-        HRegion.addRegionToMETA(root, meta);
-        root.close();
-        root.getLog().closeAndDelete();
-        meta.close();
-        meta.getLog().closeAndDelete();
-      
-      } catch (IOException e) {
-        if (e instanceof RemoteException) {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+    try {
+
+      // Make sure the root directory exists!
+
+      if(! fs.exists(dir)) {
+        fs.mkdirs(dir);
+      }
+
+      if (!fs.exists(rootRegionDir)) {
+        LOG.info("bootstrap: creating ROOT and first META regions");
+        try {
+          HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir,
+              this.conf, null);
+
+          HRegion meta =
+            HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc,
+                null, null), this.dir, this.conf, null);
+
+          // Add first region from the META table to the ROOT region.
+
+          HRegion.addRegionToMETA(root, meta);
+          root.close();
+          root.getLog().closeAndDelete();
+          meta.close();
+          meta.getLog().closeAndDelete();
+
+        } catch (IOException e) {
+          if (e instanceof RemoteException) {
+            e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+          }
+          LOG.error("bootstrap", e);
+          throw e;
         }
-        LOG.error("bootstrap", e);
       }
+      
+    } catch (IOException e) {
+      LOG.fatal("Not starting HMaster because:", e);
+      return;
     }
 
     this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
@@ -898,7 +928,6 @@
 
     // The root region
 
-    this.rootRegionLocation = new AtomicReference<HServerAddress>();
     this.rootScanned = false;
     this.rootScanner = new RootScanner();
     this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner");
@@ -1038,9 +1067,15 @@
                 (RemoteException) ex);
 
           } catch (IOException e) {
+            ex = e;
             LOG.warn("main processing loop: " + op.toString(), e);
           }
         }
+        if (!FSUtils.isFileSystemAvailable(fs)) {
+          LOG.fatal("Shutting down hbase cluster: file system not available");
+          closed = true;
+          break;
+        }
         LOG.warn("Processing pending operations: " + op.toString(), ex);
         try {
           msgQueue.put(op);
@@ -2627,6 +2662,13 @@
 
         } catch (IOException e) {
           if (tries == numRetries - 1) {
+            // No retries left
+            
+            if (!FSUtils.isFileSystemAvailable(fs)) {
+              LOG.fatal("Shutting down hbase cluster: file system not available");
+              closed = true;
+            }
+
             if (e instanceof RemoteException) {
               e = RemoteExceptionHandler.decodeRemoteException(
                   (RemoteException) e);
@@ -2692,7 +2734,7 @@
 
     @Override
     protected void postProcessMeta(MetaRegion m, HRegionInterface server)
-    throws IOException {
+      throws IOException {
       
       // Process regions not being served
       
@@ -2719,32 +2761,9 @@
         updateRegionInfo(b, i);
         b.delete(lockid, COL_SERVER);
         b.delete(lockid, COL_STARTCODE);
-
-        for (int tries = 0; tries < numRetries; tries++) {
-          try {
-            server.batchUpdate(m.getRegionName(), System.currentTimeMillis(), b);
-            
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("updated columns in row: " + i.regionName);
-            }
-            break;
-
-          } catch (IOException e) {
-            if (tries == numRetries - 1) {
-              if (e instanceof RemoteException) {
-                e = RemoteExceptionHandler.decodeRemoteException(
-                    (RemoteException) e);
-              }
-              LOG.error("column update failed in row: " + i.regionName, e);
-              break;
-            }
-          }
-          try {
-            Thread.sleep(threadWakeFrequency);
-
-          } catch (InterruptedException e) {
-            // continue
-          }
+        server.batchUpdate(m.getRegionName(), System.currentTimeMillis(), b);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("updated columns in row: " + i.regionName);
         }
 
         if (online) {                           // Bring offline regions on-line
@@ -2795,7 +2814,7 @@
     }
 
     protected void updateRegionInfo(final BatchUpdate b, final HRegionInfo i)
-    throws IOException {
+      throws IOException {
       
       i.offLine = !online;
       b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
@@ -2815,7 +2834,7 @@
 
     @Override
     protected void postProcessMeta(MetaRegion m, HRegionInterface server)
-    throws IOException {
+      throws IOException {
 
       // For regions that are being served, mark them for deletion      
       
@@ -2853,6 +2872,7 @@
   }
 
   private abstract class ColumnOperation extends TableOperation {
+    
     protected ColumnOperation(Text tableName) throws IOException {
       super(tableName);
     }
@@ -2874,31 +2894,9 @@
       BatchUpdate b = new BatchUpdate(rand.nextLong());
       long lockid = b.startUpdate(i.regionName);
       b.put(lockid, COL_REGIONINFO, Writables.getBytes(i));
-      
-      for (int tries = 0; tries < numRetries; tries++) {
-        try {
-          server.batchUpdate(regionName, System.currentTimeMillis(), b);
-        
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("updated columns in row: " + i.regionName);
-          }
-          break;
-        
-        } catch (IOException e) {
-          if (tries == numRetries - 1) {
-            if (e instanceof RemoteException) {
-              e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-            }
-            LOG.error("column update failed in row: " + i.regionName, e);
-            break;
-          }
-        }
-        try {
-          Thread.sleep(threadWakeFrequency);
-          
-        } catch (InterruptedException e) {
-          // continue
-        }
+      server.batchUpdate(regionName, System.currentTimeMillis(), b);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("updated columns in row: " + i.regionName);
       }
     }
   }
@@ -2914,7 +2912,7 @@
 
     @Override
     protected void postProcessMeta(MetaRegion m, HRegionInterface server)
-    throws IOException {
+      throws IOException {
 
       for (HRegionInfo i: unservedRegions) {
         i.tableDesc.families().remove(columnName);
@@ -2922,27 +2920,8 @@
 
         // Delete the directories used by the column
 
-        try {
-          fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName));
-
-        } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) e);
-          }
-          LOG.error("", e);
-        }
-
-        try {
-          fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName));
-
-        } catch (IOException e) {
-          if (e instanceof RemoteException) {
-            e = RemoteExceptionHandler.decodeRemoteException(
-                (RemoteException) e);
-          }
-          LOG.error("", e);
-        }
+        fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName));
+        fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName));
       }
     }
   }
@@ -2958,7 +2937,7 @@
 
     @Override
     protected void postProcessMeta(MetaRegion m, HRegionInterface server)
-    throws IOException {
+      throws IOException {
 
       for (HRegionInfo i: unservedRegions) {
 

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java
Fri Sep  7 20:09:42 2007
@@ -334,7 +334,9 @@
       } catch (RuntimeException ex) {
         LOG.error("error initializing HMemcache scanner: ", ex);
         close();
-        throw ex;
+        IOException e = new IOException("error initializing HMemcache scanner");
+        e.initCause(ex);
+        throw e;
         
       } catch(IOException ex) {
         LOG.error("error initializing HMemcache scanner: ", ex);

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java
Fri Sep  7 20:09:42 2007
@@ -52,20 +52,20 @@
 import org.apache.hadoop.hbase.io.BatchUpdate;
 import org.apache.hadoop.hbase.io.BatchOperation;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Writables;
 
-/*******************************************************************************
+/**
  * HRegionServer makes a set of HRegions available to clients.  It checks in with
  * the HMaster. There are many HRegionServers in a single HBase deployment.
- ******************************************************************************/
+ */
 public class HRegionServer implements HConstants, HRegionInterface, Runnable {
   
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   public long getProtocolVersion(final String protocol, 
       @SuppressWarnings("unused") final long clientVersion)
-  throws IOException { 
+    throws IOException {
+    
     if (protocol.equals(HRegionInterface.class.getName())) {
       return HRegionInterface.versionID;
     }
@@ -79,7 +79,8 @@
   // of HRegionServer in isolation.
   protected volatile boolean stopRequested;
   
-  // Go down hard.  Used debugging and in unit tests.
+  // Go down hard.  Used if file system becomes unavailable and also in
+  // debugging and unit tests.
   protected volatile boolean abortRequested;
   
   final Path rootDir;
@@ -146,23 +147,23 @@
      * {@inheritDoc}
      */
     public void run() {
-      while(!stopRequested) {
+      while (!stopRequested) {
         long startTime = System.currentTimeMillis();
-        synchronized(splitOrCompactLock) { // Don't interrupt us while we're working
+        synchronized (splitOrCompactLock) { // Don't interrupt us while we're working
           // Grab a list of regions to check
-          Vector<HRegion> regionsToCheck = new Vector<HRegion>();
+          ArrayList<HRegion> regionsToCheck = new ArrayList<HRegion>();
           lock.readLock().lock();
           try {
             regionsToCheck.addAll(onlineRegions.values());
           } finally {
             lock.readLock().unlock();
           }
-          try {
-            for(HRegion cur: regionsToCheck) {
-              if(cur.isClosed()) {
-                // Skip if closed
-                continue;
-              }
+          for(HRegion cur: regionsToCheck) {
+            if(cur.isClosed()) {
+              // Skip if closed
+              continue;
+            }
+            try {
               if (cur.needsCompaction()) {
                 cur.compactStores();
               }
@@ -172,10 +173,13 @@
               if (cur.needsSplit(midKey)) {
                 split(cur, midKey);
               }
+            } catch(IOException e) {
+              //TODO: What happens if this fails? Are we toast?
+              LOG.error("Split or compaction failed", e);
+              if (!checkFileSystem()) {
+                break;
+              }
             }
-          } catch(IOException e) {
-            //TODO: What happens if this fails? Are we toast?
-            LOG.error("What happens if this fails? Are we toast?", e);
           }
         }
         
@@ -198,7 +202,8 @@
     }
     
     private void split(final HRegion region, final Text midKey)
-    throws IOException {
+      throws IOException {
+      
       final HRegionInfo oldRegionInfo = region.getRegionInfo();
       final HRegion[] newRegions = region.closeAndSplit(midKey, this);
       
@@ -286,7 +291,7 @@
 
         synchronized(cacheFlusherLock) {
           // Grab a list of items to flush
-          Vector<HRegion> toFlush = new Vector<HRegion>();
+          ArrayList<HRegion> toFlush = new ArrayList<HRegion>();
           lock.readLock().lock();
           try {
             toFlush.addAll(onlineRegions.values());
@@ -310,7 +315,10 @@
                   iex = x;
                 }
               }
-              LOG.error("", iex);
+              LOG.error("Cache flush failed", iex);
+              if (!checkFileSystem()) {
+                break;
+              }
             }
           }
         }
@@ -332,7 +340,7 @@
   
   // File paths
   
-  private FileSystem fs;
+  FileSystem fs;
   
   // Logging
   
@@ -368,7 +376,10 @@
                   iex = x;
                 }
               }
-              LOG.warn("", iex);
+              LOG.error("Log rolling failed", iex);
+              if (!checkFileSystem()) {
+                break;
+              }
             }
           }
         }
@@ -737,7 +748,7 @@
             e = ex;
           }
         }
-        LOG.warn("Abort close of log", e);
+        LOG.error("Unable to close log in abort", e);
       }
       closeAllRegions(); // Don't leave any open file handles
       LOG.info("aborting server at: " +
@@ -902,6 +913,9 @@
             }
           } else {
             LOG.error("unable to process message: " + e.msg.toString(), ie);
+            if (!checkFileSystem()) {
+              break;
+            }
           }
         }
       }
@@ -973,117 +987,246 @@
     return regionsToClose;
   }
 
-  //////////////////////////////////////////////////////////////////////////////
+  //
   // HRegionInterface
-  //////////////////////////////////////////////////////////////////////////////
+  //
 
   /** {@inheritDoc} */
   public HRegionInfo getRegionInfo(final Text regionName)
-  throws NotServingRegionException {
+    throws NotServingRegionException {
+    
     requestCount.incrementAndGet();
     return getRegion(regionName).getRegionInfo();
   }
 
   /** {@inheritDoc} */
   public byte [] get(final Text regionName, final Text row,
-      final Text column)
-  throws IOException {
+      final Text column) throws IOException {
+    
     requestCount.incrementAndGet();
-    return getRegion(regionName).get(row, column);
+    try {
+      return getRegion(regionName).get(row, column);
+      
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
   }
 
   /** {@inheritDoc} */
   public byte [][] get(final Text regionName, final Text row,
-      final Text column, final int numVersions)
-  throws IOException {  
+      final Text column, final int numVersions) throws IOException {
+    
     requestCount.incrementAndGet();
-    return getRegion(regionName).get(row, column, numVersions);
+    try {
+      return getRegion(regionName).get(row, column, numVersions);
+      
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
   }
 
   /** {@inheritDoc} */
   public byte [][] get(final Text regionName, final Text row, final Text column, 
       final long timestamp, final int numVersions) throws IOException {
+    
     requestCount.incrementAndGet();
-    return getRegion(regionName).get(row, column, timestamp, numVersions);
+    try {
+      return getRegion(regionName).get(row, column, timestamp, numVersions);
+      
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
   }
 
   /** {@inheritDoc} */
   public MapWritable getRow(final Text regionName, final Text row)
-  throws IOException {
+    throws IOException {
+    
     requestCount.incrementAndGet();
-    HRegion region = getRegion(regionName);
-    MapWritable result = new MapWritable();
-    TreeMap<Text, byte[]> map = region.getFull(row);
-    for (Map.Entry<Text, byte []> es: map.entrySet()) {
-      result.put(new HStoreKey(row, es.getKey()),
-          new ImmutableBytesWritable(es.getValue()));
+    try {
+      HRegion region = getRegion(regionName);
+      MapWritable result = new MapWritable();
+      TreeMap<Text, byte[]> map = region.getFull(row);
+      for (Map.Entry<Text, byte []> es: map.entrySet()) {
+        result.put(new HStoreKey(row, es.getKey()),
+            new ImmutableBytesWritable(es.getValue()));
+      }
+      return result;
+      
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
     }
-    return result;
   }
 
   /** {@inheritDoc} */
-  public MapWritable next(final long scannerId)
-  throws IOException {
+  public MapWritable next(final long scannerId) throws IOException {
+    
     requestCount.incrementAndGet();
-    String scannerName = String.valueOf(scannerId);
-    HInternalScannerInterface s = scanners.get(scannerName);
-    if (s == null) {
-      throw new UnknownScannerException("Name: " + scannerName);
-    }
-    leases.renewLease(scannerId, scannerId);
-    
-    // Collect values to be returned here
-    
-    MapWritable values = new MapWritable();
-    
-    // Keep getting rows until we find one that has at least one non-deleted column value
-    
-    HStoreKey key = new HStoreKey();
-    TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
-    while (s.next(key, results)) {
-      for(Map.Entry<Text, byte []> e: results.entrySet()) {
-        HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
-        byte [] val = e.getValue();
-        if (HGlobals.deleteBytes.compareTo(val) == 0) {
-          // Column value is deleted. Don't return it.
-          continue;
+    try {
+      String scannerName = String.valueOf(scannerId);
+      HInternalScannerInterface s = scanners.get(scannerName);
+      if (s == null) {
+        throw new UnknownScannerException("Name: " + scannerName);
+      }
+      leases.renewLease(scannerId, scannerId);
+
+      // Collect values to be returned here
+
+      MapWritable values = new MapWritable();
+
+      // Keep getting rows until we find one that has at least one non-deleted column value
+
+      HStoreKey key = new HStoreKey();
+      TreeMap<Text, byte []> results = new TreeMap<Text, byte []>();
+      while (s.next(key, results)) {
+        for(Map.Entry<Text, byte []> e: results.entrySet()) {
+          HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp());
+          byte [] val = e.getValue();
+          if (HGlobals.deleteBytes.compareTo(val) == 0) {
+            // Column value is deleted. Don't return it.
+            continue;
+          }
+          values.put(k, new ImmutableBytesWritable(val));
+        }
+
+        if(values.size() > 0) {
+          // Row has something in it. Return the value.
+          break;
         }
-        values.put(k, new ImmutableBytesWritable(val));
+
+        // No data for this row, go get another.
+
+        results.clear();
       }
+      return values;
       
-      if(values.size() > 0) {
-        // Row has something in it. Return the value.
-        break;
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  /** {@inheritDoc} */
+  public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
+    throws IOException {
+    
+    requestCount.incrementAndGet();
+    try {
+      long lockid = startUpdate(regionName, b.getRow());
+      for(BatchOperation op: b) {
+        switch(op.getOp()) {
+        case BatchOperation.PUT_OP:
+          put(regionName, lockid, op.getColumn(), op.getValue());
+          break;
+
+        case BatchOperation.DELETE_OP:
+          delete(regionName, lockid, op.getColumn());
+          break;
+        }
       }
+      commit(regionName, lockid, timestamp);
       
-      // No data for this row, go get another.
-      
-      results.clear();
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
     }
-    return values;
   }
+  
+  //
+  // remote scanner interface
+  //
 
   /** {@inheritDoc} */
-  public void batchUpdate(Text regionName, long timestamp, BatchUpdate b)
-  throws IOException {
+  public long openScanner(Text regionName, Text[] cols, Text firstRow,
+      final long timestamp, final RowFilterInterface filter)
+    throws IOException {
+    
     requestCount.incrementAndGet();
-    long lockid = startUpdate(regionName, b.getRow());
-    for(BatchOperation op: b) {
-      switch(op.getOp()) {
-      case BatchOperation.PUT_OP:
-        put(regionName, lockid, op.getColumn(), op.getValue());
-        break;
+    try {
+      HRegion r = getRegion(regionName);
+      long scannerId = -1L;
+      HInternalScannerInterface s =
+        r.getScanner(cols, firstRow, timestamp, filter);
+      scannerId = rand.nextLong();
+      String scannerName = String.valueOf(scannerId);
+      synchronized(scanners) {
+        scanners.put(scannerName, s);
+      }
+      leases.createLease(scannerId, scannerId, new ScannerListener(scannerName));
+      return scannerId;
 
-      case BatchOperation.DELETE_OP:
-        delete(regionName, lockid, op.getColumn());
-        break;
+    } catch (IOException e) {
+      if (e instanceof RemoteException) {
+        try {
+          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
+        } catch (IOException x) {
+          e = x;
+        }
       }
+      LOG.error("", e);
+      checkFileSystem();
+      throw e;
     }
-    commit(regionName, lockid, timestamp);
   }
   
-  protected long startUpdate(Text regionName, Text row) 
-      throws IOException {
+  /** {@inheritDoc} */
+  public void close(final long scannerId) throws IOException {
+    requestCount.incrementAndGet();
+    try {
+      String scannerName = String.valueOf(scannerId);
+      HInternalScannerInterface s = null;
+      synchronized(scanners) {
+        s = scanners.remove(scannerName);
+      }
+      if(s == null) {
+        throw new UnknownScannerException(scannerName);
+      }
+      s.close();
+      leases.cancelLease(scannerId, scannerId);
+      
+    } catch (IOException e) {
+      checkFileSystem();
+      throw e;
+    }
+  }
+
+  Map<String, HInternalScannerInterface> scanners =
+    Collections.synchronizedMap(new HashMap<String,
+      HInternalScannerInterface>());
+
+  /** 
+   * Instantiated as a scanner lease.
+   * If the lease times out, the scanner is closed
+   */
+  private class ScannerListener implements LeaseListener {
+    private final String scannerName;
+    
+    ScannerListener(final String n) {
+      this.scannerName = n;
+    }
+    
+    /** {@inheritDoc} */
+    public void leaseExpired() {
+      LOG.info("Scanner " + this.scannerName + " lease expired");
+      HInternalScannerInterface s = null;
+      synchronized(scanners) {
+        s = scanners.remove(this.scannerName);
+      }
+      if (s != null) {
+        s.close();
+      }
+    }
+  }
+  
+  //
+  // Methods that do the actual work for the remote API
+  //
+  
+  protected long startUpdate(Text regionName, Text row) throws IOException {
     
     HRegion region = getRegion(regionName);
     return region.startUpdate(row);
@@ -1097,7 +1240,7 @@
   }
 
   protected void delete(Text regionName, long lockid, Text column) 
-  throws IOException {
+    throws IOException {
 
     HRegion region = getRegion(regionName);
     region.delete(lockid, column);
@@ -1117,7 +1260,8 @@
    * @throws NotServingRegionException
    */
   protected HRegion getRegion(final Text regionName)
-  throws NotServingRegionException {
+    throws NotServingRegionException {
+    
     return getRegion(regionName, false);
   }
   
@@ -1129,9 +1273,9 @@
    * @return {@link HRegion} for <code>regionName</code>
    * @throws NotServingRegionException
    */
-  protected HRegion getRegion(final Text regionName, 
-      final boolean checkRetiringRegions)
-  throws NotServingRegionException {
+  protected HRegion getRegion(final Text regionName,
+      final boolean checkRetiringRegions) throws NotServingRegionException {
+    
     HRegion region = null;
     this.lock.readLock().lock();
     try {
@@ -1154,91 +1298,28 @@
       this.lock.readLock().unlock();
     }
   }
-
-  //////////////////////////////////////////////////////////////////////////////
-  // remote scanner interface
-  //////////////////////////////////////////////////////////////////////////////
-
-  Map<String, HInternalScannerInterface> scanners =
-    Collections.synchronizedMap(new HashMap<String,
-      HInternalScannerInterface>());
-
-  /** 
-   * Instantiated as a scanner lease.
-   * If the lease times out, the scanner is closed
-   */
-  private class ScannerListener implements LeaseListener {
-    private final String scannerName;
-    
-    ScannerListener(final String n) {
-      this.scannerName = n;
-    }
-    
-    /**
-     * {@inheritDoc}
-     */
-    public void leaseExpired() {
-      LOG.info("Scanner " + this.scannerName + " lease expired");
-      HInternalScannerInterface s = null;
-      synchronized(scanners) {
-        s = scanners.remove(this.scannerName);
-      }
-      if (s != null) {
-        s.close();
-      }
-    }
-  }
-  
-  /**
-   * {@inheritDoc}
-   */
-  public long openScanner(Text regionName, Text[] cols, Text firstRow,
-      final long timestamp, final RowFilterInterface filter)
-  throws IOException {
-    requestCount.incrementAndGet();
-    HRegion r = getRegion(regionName);
-    long scannerId = -1L;
-    try {
-      HInternalScannerInterface s =
-        r.getScanner(cols, firstRow, timestamp, filter);
-      scannerId = rand.nextLong();
-      String scannerName = String.valueOf(scannerId);
-      synchronized(scanners) {
-        scanners.put(scannerName, s);
-      }
-      leases.createLease(scannerId, scannerId,
-        new ScannerListener(scannerName));
-    } catch (IOException e) {
-      if (e instanceof RemoteException) {
-        try {
-          e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e);
-        } catch (IOException x) {
-          e = x;
-        }
-      }
-      LOG.error("", e);
-      throw e;
-    }
-    return scannerId;
-  }
   
   /**
-   * {@inheritDoc}
+   * Checks to see if the file system is still accessible.
+   * If not, sets abortRequested and stopRequested
+   * 
+   * @return false if file system is not available
    */
-  public void close(final long scannerId) throws IOException {
-    requestCount.incrementAndGet();
-    String scannerName = String.valueOf(scannerId);
-    HInternalScannerInterface s = null;
-    synchronized(scanners) {
-      s = scanners.remove(scannerName);
-    }
-    if(s == null) {
-      throw new UnknownScannerException(scannerName);
+  protected boolean checkFileSystem() {
+    boolean fsOk = true;
+    if (!FSUtils.isFileSystemAvailable(fs)) {
+      LOG.fatal("Shutting down HRegionServer: file system not available");
+      abortRequested = true;
+      stopRequested = true;
+      fsOk = false;
     }
-    s.close();
-    leases.cancelLease(scannerId, scannerId);
+    return fsOk;
   }
 
+  //
+  // Main program and support routines
+  //
+  
   private static void printUsageAndExit() {
     printUsageAndExit(null);
   }

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Fri
Sep  7 20:09:42 2007
@@ -1298,6 +1298,9 @@
       } catch (Exception ex) {
         LOG.error("Failed construction", ex);
         close();
+        IOException e = new IOException("HStoreScanner failed construction");
+        e.initCause(ex);
+        throw e;
       }
     }
 

Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=573777&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
(added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java
Fri Sep  7 20:09:42 2007
@@ -0,0 +1,63 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.dfs.DistributedFileSystem;
+
+/**
+ * Utility methods for interacting with the underlying file system.
+ */
+public class FSUtils {
+  private static final Log LOG = LogFactory.getLog(FSUtils.class);
+
+  private FSUtils() {}                                  // not instantiable
+  
+  /**
+   * Checks to see if the specified file system is available
+   * 
+   * @param fs
+   * @return true if the specified file system is available.
+   */
+  public static boolean isFileSystemAvailable(FileSystem fs) {
+    boolean available = false;
+    if (fs instanceof DistributedFileSystem) {
+      try {
+        if (((DistributedFileSystem) fs).getDataNodeStats().length > 0) {
+          available = true;
+          
+        } else {
+          LOG.fatal("file system unavailable: no data nodes");
+        }
+        
+      } catch (IOException e) {
+        LOG.fatal("file system unavailable because: ", e);
+      }
+      
+    } else {
+      available = true;
+    }
+    return available;
+  }
+}

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java
Fri Sep  7 20:09:42 2007
@@ -310,6 +310,32 @@
   }
   
   /**
+   * Wait for Mini HBase Cluster to shut down.
+   */
+  public void join() {
+    if (regionThreads != null) {
+      synchronized(regionThreads) {
+        for(Thread t: regionThreads) {
+          if (t.isAlive()) {
+            try {
+              t.join();
+            } catch (InterruptedException e) {
+              // continue
+            }
+          }
+        }
+      }
+    }
+    if (masterThread != null && masterThread.isAlive()) {
+      try {
+        masterThread.join();
+      } catch(InterruptedException e) {
+        // continue
+      }
+    }
+  }
+  
+  /**
    * Shut down HBase cluster started by calling
    * {@link #startMaster(Configuration)} and then
    * {@link #startRegionServers(Configuration, int)};

Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java?rev=573777&r1=573776&r2=573777&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java
(original)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java
Fri Sep  7 20:09:42 2007
@@ -61,31 +61,32 @@
     value = System.getenv("DEBUGGING");
     if(value != null && value.equalsIgnoreCase("TRUE")) {
       debugging = true;
+    }
       
-      Logger rootLogger = Logger.getRootLogger();
-      // rootLogger.setLevel(Level.WARN);
+    Logger rootLogger = Logger.getRootLogger();
+    rootLogger.setLevel(Level.WARN);
 
-      Level logLevel = Level.INFO;
-      value = System.getenv("LOGGING_LEVEL");
-      if(value != null && value.length() != 0) {
-        if(value.equalsIgnoreCase("ALL")) {
-          logLevel = Level.ALL;
-        } else if(value.equalsIgnoreCase("DEBUG")) {
-          logLevel = Level.DEBUG;
-        } else if(value.equalsIgnoreCase("ERROR")) {
-          logLevel = Level.ERROR;
-        } else if(value.equalsIgnoreCase("FATAL")) {
-          logLevel = Level.FATAL;
-        } else if(value.equalsIgnoreCase("INFO")) {
-          logLevel = Level.INFO;
-        } else if(value.equalsIgnoreCase("OFF")) {
-          logLevel = Level.OFF;
-        } else if(value.equalsIgnoreCase("TRACE")) {
-          logLevel = Level.TRACE;
-        } else if(value.equalsIgnoreCase("WARN")) {
-          logLevel = Level.WARN;
-        }
+    Level logLevel = Level.DEBUG;
+    value = System.getenv("LOGGING_LEVEL");
+    if(value != null && value.length() != 0) {
+      if(value.equalsIgnoreCase("ALL")) {
+        logLevel = Level.ALL;
+      } else if(value.equalsIgnoreCase("DEBUG")) {
+        logLevel = Level.DEBUG;
+      } else if(value.equalsIgnoreCase("ERROR")) {
+        logLevel = Level.ERROR;
+      } else if(value.equalsIgnoreCase("FATAL")) {
+        logLevel = Level.FATAL;
+      } else if(value.equalsIgnoreCase("INFO")) {
+        logLevel = Level.INFO;
+      } else if(value.equalsIgnoreCase("OFF")) {
+        logLevel = Level.OFF;
+      } else if(value.equalsIgnoreCase("TRACE")) {
+        logLevel = Level.TRACE;
+      } else if(value.equalsIgnoreCase("WARN")) {
+        logLevel = Level.WARN;
       }
+
       ConsoleAppender consoleAppender = null;
       for(Enumeration<Appender> e = rootLogger.getAllAppenders();
           e.hasMoreElements();) {
@@ -103,8 +104,8 @@
           consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n");
         }
       }
-      Logger.getLogger(
-          HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
     }    
+    Logger.getLogger(
+        HBaseTestCase.class.getPackage().getName()).setLevel(logLevel);
   }
 }

Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java?rev=573777&view=auto
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
(added)
+++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java
Fri Sep  7 20:09:42 2007
@@ -0,0 +1,66 @@
+/**
+ * Copyright 2007 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase;
+
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+
+/**
+ * Test ability of HBase to handle DFS failure
+ */
+public class TestDFSAbort extends HBaseClusterTestCase {
+
+  /** constructor */
+  public TestDFSAbort() {
+    super();
+    conf.setInt("ipc.client.timeout", 5000);            // reduce ipc client timeout
+    conf.setInt("ipc.client.connect.max.retries", 5);   // and number of retries
+    conf.setInt("hbase.client.retries.number", 5);      // reduce HBase retries
+    Logger.getRootLogger().setLevel(Level.WARN);
+    Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG);
+  }
+  
+  /** {@inheritDoc} */
+  @Override
+  public void setUp() throws Exception {
+    super.setUp();
+    
+    HTableDescriptor desc = new HTableDescriptor(getName());
+    desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY_STR));
+    
+    HBaseAdmin admin = new HBaseAdmin(conf);
+    admin.createTable(desc);
+  }
+  
+  /**
+   * @throws Exception
+   */
+  public void testDFSAbort() throws Exception {
+    
+    // By now the Mini DFS is running, Mini HBase is running and we have
+    // created a table. Now let's yank the rug out from HBase
+    
+    cluster.getDFSCluster().shutdown();
+    
+    // Now wait for Mini HBase Cluster to shut down
+    
+    cluster.join();
+  }
+}



Mime
View raw message