Return-Path: Delivered-To: apmail-lucene-hadoop-commits-archive@locus.apache.org Received: (qmail 97771 invoked from network); 8 Sep 2007 03:10:20 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Sep 2007 03:10:20 -0000 Received: (qmail 503 invoked by uid 500); 8 Sep 2007 03:10:14 -0000 Delivered-To: apmail-lucene-hadoop-commits-archive@lucene.apache.org Received: (qmail 392 invoked by uid 500); 8 Sep 2007 03:10:14 -0000 Mailing-List: contact hadoop-commits-help@lucene.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hadoop-dev@lucene.apache.org Delivered-To: mailing list hadoop-commits@lucene.apache.org Received: (qmail 378 invoked by uid 99); 8 Sep 2007 03:10:14 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 07 Sep 2007 20:10:14 -0700 X-ASF-Spam-Status: No, hits=-98.8 required=10.0 tests=ALL_TRUSTED,DNS_FROM_DOB,RCVD_IN_DOB X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 08 Sep 2007 03:11:31 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 771AE1A9832; Fri, 7 Sep 2007 20:09:44 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r573777 - in /lucene/hadoop/trunk/src/contrib/hbase: ./ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/util/ src/test/org/apache/hadoop/hbase/ Date: Sat, 08 Sep 2007 03:09:43 -0000 To: hadoop-commits@lucene.apache.org From: jimk@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20070908030944.771AE1A9832@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: jimk Date: Fri Sep 7 20:09:42 2007 New Revision: 573777 URL: http://svn.apache.org/viewvc?rev=573777&view=rev Log: HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=573777&r1=573776&r2=573777&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Fri Sep 7 20:09:42 2007 @@ -27,6 +27,7 @@ HADOOP-1785 TableInputFormat.TableRecordReader.next has a bug (Ning Li via Stack) HADOOP-1800 output should default utf8 encoding + HADOOP-1801 When hdfs is yanked out from under hbase, hbase should go down gracefully HADOOP-1814 TestCleanRegionServerExit fails too often on Hudson HADOOP-1821 Replace all String.getBytes() with String.getBytes("UTF-8") HADOOP-1832 listTables() returns duplicate tables Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java?rev=573777&r1=573776&r2=573777&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMaster.java Fri Sep 7 20:09:42 2007 @@ -56,6 +56,7 @@ import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; @@ -102,7 +103,8 @@ long metaRescanInterval; - final AtomicReference rootRegionLocation; + final AtomicReference rootRegionLocation = + new AtomicReference(); Lock splitLogLock = new ReentrantLock(); @@ -359,8 +361,10 @@ for (Text family: split.getTableDesc().families().keySet()) { Path p = HStoreFile.getMapDir(fs.makeQualified(dir), split.getRegionName(), HStoreKey.extractFamily(family)); + // Look for reference files. Call listPaths with an anonymous // instance of PathFilter. + Path [] ps = fs.listPaths(p, new PathFilter () { public boolean accept(Path path) { @@ -368,7 +372,7 @@ } } ); - + if (ps != null && ps.length > 0) { result = true; break; @@ -393,7 +397,7 @@ } protected void checkAssigned(final HRegionInfo info, - final String serverName, final long startCode) { + final String serverName, final long startCode) throws IOException { // Skip region - if ... if(info.offLine // offline @@ -445,6 +449,7 @@ } catch (IOException e) { LOG.warn("unable to split region server log because: ", e); + throw e; } } @@ -512,6 +517,14 @@ // at least log it rather than go out silently. LOG.error("Unexpected exception", e); } + + // We ran out of tries. Make sure the file system is still available + + if (!FSUtils.isFileSystemAvailable(fs)) { + LOG.fatal("Shutting down hbase cluster: file system not available"); + closed = true; + } + if (!closed) { // sleep before retry @@ -675,6 +688,13 @@ LOG.error("Unexpected exception", e); } + // We ran out of tries. Make sure the file system is still available + + if (!FSUtils.isFileSystemAvailable(fs)) { + LOG.fatal("Shutting down hbase cluster: file system not available"); + closed = true; + } + if (!closed) { // sleep before retry try { @@ -829,46 +849,56 @@ * @throws IOException */ public HMaster(Path dir, HServerAddress address, Configuration conf) - throws IOException { + throws IOException { + this.closed = true; this.dir = dir; this.conf = conf; this.fs = FileSystem.get(conf); this.rand = new Random(); - - // Make sure the root directory exists! - if(! fs.exists(dir)) { - fs.mkdirs(dir); - } - + Path rootRegionDir = HRegion.getRegionDir(dir, HGlobals.rootRegionInfo.regionName); LOG.info("Root region dir: " + rootRegionDir.toString()); - if (!fs.exists(rootRegionDir)) { - LOG.info("bootstrap: creating ROOT and first META regions"); - try { - HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir, - this.conf, null); - - HRegion meta = - HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc, - null, null), this.dir, this.conf, null); - - // Add first region from the META table to the ROOT region. - - HRegion.addRegionToMETA(root, meta); - root.close(); - root.getLog().closeAndDelete(); - meta.close(); - meta.getLog().closeAndDelete(); - - } catch (IOException e) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + try { + + // Make sure the root directory exists! + + if(! fs.exists(dir)) { + fs.mkdirs(dir); + } + + if (!fs.exists(rootRegionDir)) { + LOG.info("bootstrap: creating ROOT and first META regions"); + try { + HRegion root = HRegion.createHRegion(HGlobals.rootRegionInfo, this.dir, + this.conf, null); + + HRegion meta = + HRegion.createHRegion(new HRegionInfo(1L, HGlobals.metaTableDesc, + null, null), this.dir, this.conf, null); + + // Add first region from the META table to the ROOT region. + + HRegion.addRegionToMETA(root, meta); + root.close(); + root.getLog().closeAndDelete(); + meta.close(); + meta.getLog().closeAndDelete(); + + } catch (IOException e) { + if (e instanceof RemoteException) { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } + LOG.error("bootstrap", e); + throw e; } - LOG.error("bootstrap", e); } + + } catch (IOException e) { + LOG.fatal("Not starting HMaster because:", e); + return; } this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); @@ -898,7 +928,6 @@ // The root region - this.rootRegionLocation = new AtomicReference(); this.rootScanned = false; this.rootScanner = new RootScanner(); this.rootScannerThread = new Thread(rootScanner, "HMaster.rootScanner"); @@ -1038,9 +1067,15 @@ (RemoteException) ex); } catch (IOException e) { + ex = e; LOG.warn("main processing loop: " + op.toString(), e); } } + if (!FSUtils.isFileSystemAvailable(fs)) { + LOG.fatal("Shutting down hbase cluster: file system not available"); + closed = true; + break; + } LOG.warn("Processing pending operations: " + op.toString(), ex); try { msgQueue.put(op); @@ -2627,6 +2662,13 @@ } catch (IOException e) { if (tries == numRetries - 1) { + // No retries left + + if (!FSUtils.isFileSystemAvailable(fs)) { + LOG.fatal("Shutting down hbase cluster: file system not available"); + closed = true; + } + if (e instanceof RemoteException) { e = RemoteExceptionHandler.decodeRemoteException( (RemoteException) e); @@ -2692,7 +2734,7 @@ @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) - throws IOException { + throws IOException { // Process regions not being served @@ -2719,32 +2761,9 @@ updateRegionInfo(b, i); b.delete(lockid, COL_SERVER); b.delete(lockid, COL_STARTCODE); - - for (int tries = 0; tries < numRetries; tries++) { - try { - server.batchUpdate(m.getRegionName(), System.currentTimeMillis(), b); - - if (LOG.isDebugEnabled()) { - LOG.debug("updated columns in row: " + i.regionName); - } - break; - - } catch (IOException e) { - if (tries == numRetries - 1) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException( - (RemoteException) e); - } - LOG.error("column update failed in row: " + i.regionName, e); - break; - } - } - try { - Thread.sleep(threadWakeFrequency); - - } catch (InterruptedException e) { - // continue - } + server.batchUpdate(m.getRegionName(), System.currentTimeMillis(), b); + if (LOG.isDebugEnabled()) { + LOG.debug("updated columns in row: " + i.regionName); } if (online) { // Bring offline regions on-line @@ -2795,7 +2814,7 @@ } protected void updateRegionInfo(final BatchUpdate b, final HRegionInfo i) - throws IOException { + throws IOException { i.offLine = !online; b.put(lockid, COL_REGIONINFO, Writables.getBytes(i)); @@ -2815,7 +2834,7 @@ @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) - throws IOException { + throws IOException { // For regions that are being served, mark them for deletion @@ -2853,6 +2872,7 @@ } private abstract class ColumnOperation extends TableOperation { + protected ColumnOperation(Text tableName) throws IOException { super(tableName); } @@ -2874,31 +2894,9 @@ BatchUpdate b = new BatchUpdate(rand.nextLong()); long lockid = b.startUpdate(i.regionName); b.put(lockid, COL_REGIONINFO, Writables.getBytes(i)); - - for (int tries = 0; tries < numRetries; tries++) { - try { - server.batchUpdate(regionName, System.currentTimeMillis(), b); - - if (LOG.isDebugEnabled()) { - LOG.debug("updated columns in row: " + i.regionName); - } - break; - - } catch (IOException e) { - if (tries == numRetries - 1) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } - LOG.error("column update failed in row: " + i.regionName, e); - break; - } - } - try { - Thread.sleep(threadWakeFrequency); - - } catch (InterruptedException e) { - // continue - } + server.batchUpdate(regionName, System.currentTimeMillis(), b); + if (LOG.isDebugEnabled()) { + LOG.debug("updated columns in row: " + i.regionName); } } } @@ -2914,7 +2912,7 @@ @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) - throws IOException { + throws IOException { for (HRegionInfo i: unservedRegions) { i.tableDesc.families().remove(columnName); @@ -2922,27 +2920,8 @@ // Delete the directories used by the column - try { - fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName)); - - } catch (IOException e) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException( - (RemoteException) e); - } - LOG.error("", e); - } - - try { - fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName)); - - } catch (IOException e) { - if (e instanceof RemoteException) { - e = RemoteExceptionHandler.decodeRemoteException( - (RemoteException) e); - } - LOG.error("", e); - } + fs.delete(HStoreFile.getMapDir(dir, i.regionName, columnName)); + fs.delete(HStoreFile.getInfoDir(dir, i.regionName, columnName)); } } } @@ -2958,7 +2937,7 @@ @Override protected void postProcessMeta(MetaRegion m, HRegionInterface server) - throws IOException { + throws IOException { for (HRegionInfo i: unservedRegions) { Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java?rev=573777&r1=573776&r2=573777&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HMemcache.java Fri Sep 7 20:09:42 2007 @@ -334,7 +334,9 @@ } catch (RuntimeException ex) { LOG.error("error initializing HMemcache scanner: ", ex); close(); - throw ex; + IOException e = new IOException("error initializing HMemcache scanner"); + e.initCause(ex); + throw e; } catch(IOException ex) { LOG.error("error initializing HMemcache scanner: ", ex); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java?rev=573777&r1=573776&r2=573777&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HRegionServer.java Fri Sep 7 20:09:42 2007 @@ -52,20 +52,20 @@ import org.apache.hadoop.hbase.io.BatchUpdate; import org.apache.hadoop.hbase.io.BatchOperation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Writables; -/******************************************************************************* +/** * HRegionServer makes a set of HRegions available to clients. It checks in with * the HMaster. There are many HRegionServers in a single HBase deployment. - ******************************************************************************/ + */ public class HRegionServer implements HConstants, HRegionInterface, Runnable { - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ public long getProtocolVersion(final String protocol, @SuppressWarnings("unused") final long clientVersion) - throws IOException { + throws IOException { + if (protocol.equals(HRegionInterface.class.getName())) { return HRegionInterface.versionID; } @@ -79,7 +79,8 @@ // of HRegionServer in isolation. protected volatile boolean stopRequested; - // Go down hard. Used debugging and in unit tests. + // Go down hard. Used if file system becomes unavailable and also in + // debugging and unit tests. protected volatile boolean abortRequested; final Path rootDir; @@ -146,23 +147,23 @@ * {@inheritDoc} */ public void run() { - while(!stopRequested) { + while (!stopRequested) { long startTime = System.currentTimeMillis(); - synchronized(splitOrCompactLock) { // Don't interrupt us while we're working + synchronized (splitOrCompactLock) { // Don't interrupt us while we're working // Grab a list of regions to check - Vector regionsToCheck = new Vector(); + ArrayList regionsToCheck = new ArrayList(); lock.readLock().lock(); try { regionsToCheck.addAll(onlineRegions.values()); } finally { lock.readLock().unlock(); } - try { - for(HRegion cur: regionsToCheck) { - if(cur.isClosed()) { - // Skip if closed - continue; - } + for(HRegion cur: regionsToCheck) { + if(cur.isClosed()) { + // Skip if closed + continue; + } + try { if (cur.needsCompaction()) { cur.compactStores(); } @@ -172,10 +173,13 @@ if (cur.needsSplit(midKey)) { split(cur, midKey); } + } catch(IOException e) { + //TODO: What happens if this fails? Are we toast? + LOG.error("Split or compaction failed", e); + if (!checkFileSystem()) { + break; + } } - } catch(IOException e) { - //TODO: What happens if this fails? Are we toast? - LOG.error("What happens if this fails? Are we toast?", e); } } @@ -198,7 +202,8 @@ } private void split(final HRegion region, final Text midKey) - throws IOException { + throws IOException { + final HRegionInfo oldRegionInfo = region.getRegionInfo(); final HRegion[] newRegions = region.closeAndSplit(midKey, this); @@ -286,7 +291,7 @@ synchronized(cacheFlusherLock) { // Grab a list of items to flush - Vector toFlush = new Vector(); + ArrayList toFlush = new ArrayList(); lock.readLock().lock(); try { toFlush.addAll(onlineRegions.values()); @@ -310,7 +315,10 @@ iex = x; } } - LOG.error("", iex); + LOG.error("Cache flush failed", iex); + if (!checkFileSystem()) { + break; + } } } } @@ -332,7 +340,7 @@ // File paths - private FileSystem fs; + FileSystem fs; // Logging @@ -368,7 +376,10 @@ iex = x; } } - LOG.warn("", iex); + LOG.error("Log rolling failed", iex); + if (!checkFileSystem()) { + break; + } } } } @@ -737,7 +748,7 @@ e = ex; } } - LOG.warn("Abort close of log", e); + LOG.error("Unable to close log in abort", e); } closeAllRegions(); // Don't leave any open file handles LOG.info("aborting server at: " + @@ -902,6 +913,9 @@ } } else { LOG.error("unable to process message: " + e.msg.toString(), ie); + if (!checkFileSystem()) { + break; + } } } } @@ -973,117 +987,246 @@ return regionsToClose; } - ////////////////////////////////////////////////////////////////////////////// + // // HRegionInterface - ////////////////////////////////////////////////////////////////////////////// + // /** {@inheritDoc} */ public HRegionInfo getRegionInfo(final Text regionName) - throws NotServingRegionException { + throws NotServingRegionException { + requestCount.incrementAndGet(); return getRegion(regionName).getRegionInfo(); } /** {@inheritDoc} */ public byte [] get(final Text regionName, final Text row, - final Text column) - throws IOException { + final Text column) throws IOException { + requestCount.incrementAndGet(); - return getRegion(regionName).get(row, column); + try { + return getRegion(regionName).get(row, column); + + } catch (IOException e) { + checkFileSystem(); + throw e; + } } /** {@inheritDoc} */ public byte [][] get(final Text regionName, final Text row, - final Text column, final int numVersions) - throws IOException { + final Text column, final int numVersions) throws IOException { + requestCount.incrementAndGet(); - return getRegion(regionName).get(row, column, numVersions); + try { + return getRegion(regionName).get(row, column, numVersions); + + } catch (IOException e) { + checkFileSystem(); + throw e; + } } /** {@inheritDoc} */ public byte [][] get(final Text regionName, final Text row, final Text column, final long timestamp, final int numVersions) throws IOException { + requestCount.incrementAndGet(); - return getRegion(regionName).get(row, column, timestamp, numVersions); + try { + return getRegion(regionName).get(row, column, timestamp, numVersions); + + } catch (IOException e) { + checkFileSystem(); + throw e; + } } /** {@inheritDoc} */ public MapWritable getRow(final Text regionName, final Text row) - throws IOException { + throws IOException { + requestCount.incrementAndGet(); - HRegion region = getRegion(regionName); - MapWritable result = new MapWritable(); - TreeMap map = region.getFull(row); - for (Map.Entry es: map.entrySet()) { - result.put(new HStoreKey(row, es.getKey()), - new ImmutableBytesWritable(es.getValue())); + try { + HRegion region = getRegion(regionName); + MapWritable result = new MapWritable(); + TreeMap map = region.getFull(row); + for (Map.Entry es: map.entrySet()) { + result.put(new HStoreKey(row, es.getKey()), + new ImmutableBytesWritable(es.getValue())); + } + return result; + + } catch (IOException e) { + checkFileSystem(); + throw e; } - return result; } /** {@inheritDoc} */ - public MapWritable next(final long scannerId) - throws IOException { + public MapWritable next(final long scannerId) throws IOException { + requestCount.incrementAndGet(); - String scannerName = String.valueOf(scannerId); - HInternalScannerInterface s = scanners.get(scannerName); - if (s == null) { - throw new UnknownScannerException("Name: " + scannerName); - } - leases.renewLease(scannerId, scannerId); - - // Collect values to be returned here - - MapWritable values = new MapWritable(); - - // Keep getting rows until we find one that has at least one non-deleted column value - - HStoreKey key = new HStoreKey(); - TreeMap results = new TreeMap(); - while (s.next(key, results)) { - for(Map.Entry e: results.entrySet()) { - HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()); - byte [] val = e.getValue(); - if (HGlobals.deleteBytes.compareTo(val) == 0) { - // Column value is deleted. Don't return it. - continue; + try { + String scannerName = String.valueOf(scannerId); + HInternalScannerInterface s = scanners.get(scannerName); + if (s == null) { + throw new UnknownScannerException("Name: " + scannerName); + } + leases.renewLease(scannerId, scannerId); + + // Collect values to be returned here + + MapWritable values = new MapWritable(); + + // Keep getting rows until we find one that has at least one non-deleted column value + + HStoreKey key = new HStoreKey(); + TreeMap results = new TreeMap(); + while (s.next(key, results)) { + for(Map.Entry e: results.entrySet()) { + HStoreKey k = new HStoreKey(key.getRow(), e.getKey(), key.getTimestamp()); + byte [] val = e.getValue(); + if (HGlobals.deleteBytes.compareTo(val) == 0) { + // Column value is deleted. Don't return it. + continue; + } + values.put(k, new ImmutableBytesWritable(val)); + } + + if(values.size() > 0) { + // Row has something in it. Return the value. + break; } - values.put(k, new ImmutableBytesWritable(val)); + + // No data for this row, go get another. + + results.clear(); } + return values; - if(values.size() > 0) { - // Row has something in it. Return the value. - break; + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + /** {@inheritDoc} */ + public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) + throws IOException { + + requestCount.incrementAndGet(); + try { + long lockid = startUpdate(regionName, b.getRow()); + for(BatchOperation op: b) { + switch(op.getOp()) { + case BatchOperation.PUT_OP: + put(regionName, lockid, op.getColumn(), op.getValue()); + break; + + case BatchOperation.DELETE_OP: + delete(regionName, lockid, op.getColumn()); + break; + } } + commit(regionName, lockid, timestamp); - // No data for this row, go get another. - - results.clear(); + } catch (IOException e) { + checkFileSystem(); + throw e; } - return values; } + + // + // remote scanner interface + // /** {@inheritDoc} */ - public void batchUpdate(Text regionName, long timestamp, BatchUpdate b) - throws IOException { + public long openScanner(Text regionName, Text[] cols, Text firstRow, + final long timestamp, final RowFilterInterface filter) + throws IOException { + requestCount.incrementAndGet(); - long lockid = startUpdate(regionName, b.getRow()); - for(BatchOperation op: b) { - switch(op.getOp()) { - case BatchOperation.PUT_OP: - put(regionName, lockid, op.getColumn(), op.getValue()); - break; + try { + HRegion r = getRegion(regionName); + long scannerId = -1L; + HInternalScannerInterface s = + r.getScanner(cols, firstRow, timestamp, filter); + scannerId = rand.nextLong(); + String scannerName = String.valueOf(scannerId); + synchronized(scanners) { + scanners.put(scannerName, s); + } + leases.createLease(scannerId, scannerId, new ScannerListener(scannerName)); + return scannerId; - case BatchOperation.DELETE_OP: - delete(regionName, lockid, op.getColumn()); - break; + } catch (IOException e) { + if (e instanceof RemoteException) { + try { + e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); + } catch (IOException x) { + e = x; + } } + LOG.error("", e); + checkFileSystem(); + throw e; } - commit(regionName, lockid, timestamp); } - protected long startUpdate(Text regionName, Text row) - throws IOException { + /** {@inheritDoc} */ + public void close(final long scannerId) throws IOException { + requestCount.incrementAndGet(); + try { + String scannerName = String.valueOf(scannerId); + HInternalScannerInterface s = null; + synchronized(scanners) { + s = scanners.remove(scannerName); + } + if(s == null) { + throw new UnknownScannerException(scannerName); + } + s.close(); + leases.cancelLease(scannerId, scannerId); + + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + Map scanners = + Collections.synchronizedMap(new HashMap()); + + /** + * Instantiated as a scanner lease. + * If the lease times out, the scanner is closed + */ + private class ScannerListener implements LeaseListener { + private final String scannerName; + + ScannerListener(final String n) { + this.scannerName = n; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Scanner " + this.scannerName + " lease expired"); + HInternalScannerInterface s = null; + synchronized(scanners) { + s = scanners.remove(this.scannerName); + } + if (s != null) { + s.close(); + } + } + } + + // + // Methods that do the actual work for the remote API + // + + protected long startUpdate(Text regionName, Text row) throws IOException { HRegion region = getRegion(regionName); return region.startUpdate(row); @@ -1097,7 +1240,7 @@ } protected void delete(Text regionName, long lockid, Text column) - throws IOException { + throws IOException { HRegion region = getRegion(regionName); region.delete(lockid, column); @@ -1117,7 +1260,8 @@ * @throws NotServingRegionException */ protected HRegion getRegion(final Text regionName) - throws NotServingRegionException { + throws NotServingRegionException { + return getRegion(regionName, false); } @@ -1129,9 +1273,9 @@ * @return {@link HRegion} for regionName * @throws NotServingRegionException */ - protected HRegion getRegion(final Text regionName, - final boolean checkRetiringRegions) - throws NotServingRegionException { + protected HRegion getRegion(final Text regionName, + final boolean checkRetiringRegions) throws NotServingRegionException { + HRegion region = null; this.lock.readLock().lock(); try { @@ -1154,91 +1298,28 @@ this.lock.readLock().unlock(); } } - - ////////////////////////////////////////////////////////////////////////////// - // remote scanner interface - ////////////////////////////////////////////////////////////////////////////// - - Map scanners = - Collections.synchronizedMap(new HashMap()); - - /** - * Instantiated as a scanner lease. - * If the lease times out, the scanner is closed - */ - private class ScannerListener implements LeaseListener { - private final String scannerName; - - ScannerListener(final String n) { - this.scannerName = n; - } - - /** - * {@inheritDoc} - */ - public void leaseExpired() { - LOG.info("Scanner " + this.scannerName + " lease expired"); - HInternalScannerInterface s = null; - synchronized(scanners) { - s = scanners.remove(this.scannerName); - } - if (s != null) { - s.close(); - } - } - } - - /** - * {@inheritDoc} - */ - public long openScanner(Text regionName, Text[] cols, Text firstRow, - final long timestamp, final RowFilterInterface filter) - throws IOException { - requestCount.incrementAndGet(); - HRegion r = getRegion(regionName); - long scannerId = -1L; - try { - HInternalScannerInterface s = - r.getScanner(cols, firstRow, timestamp, filter); - scannerId = rand.nextLong(); - String scannerName = String.valueOf(scannerId); - synchronized(scanners) { - scanners.put(scannerName, s); - } - leases.createLease(scannerId, scannerId, - new ScannerListener(scannerName)); - } catch (IOException e) { - if (e instanceof RemoteException) { - try { - e = RemoteExceptionHandler.decodeRemoteException((RemoteException) e); - } catch (IOException x) { - e = x; - } - } - LOG.error("", e); - throw e; - } - return scannerId; - } /** - * {@inheritDoc} + * Checks to see if the file system is still accessible. + * If not, sets abortRequested and stopRequested + * + * @return false if file system is not available */ - public void close(final long scannerId) throws IOException { - requestCount.incrementAndGet(); - String scannerName = String.valueOf(scannerId); - HInternalScannerInterface s = null; - synchronized(scanners) { - s = scanners.remove(scannerName); - } - if(s == null) { - throw new UnknownScannerException(scannerName); + protected boolean checkFileSystem() { + boolean fsOk = true; + if (!FSUtils.isFileSystemAvailable(fs)) { + LOG.fatal("Shutting down HRegionServer: file system not available"); + abortRequested = true; + stopRequested = true; + fsOk = false; } - s.close(); - leases.cancelLease(scannerId, scannerId); + return fsOk; } + // + // Main program and support routines + // + private static void printUsageAndExit() { printUsageAndExit(null); } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java?rev=573777&r1=573776&r2=573777&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/HStore.java Fri Sep 7 20:09:42 2007 @@ -1298,6 +1298,9 @@ } catch (Exception ex) { LOG.error("Failed construction", ex); close(); + IOException e = new IOException("HStoreScanner failed construction"); + e.initCause(ex); + throw e; } } Added: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java?rev=573777&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/util/FSUtils.java Fri Sep 7 20:09:42 2007 @@ -0,0 +1,63 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.dfs.DistributedFileSystem; + +/** + * Utility methods for interacting with the underlying file system. + */ +public class FSUtils { + private static final Log LOG = LogFactory.getLog(FSUtils.class); + + private FSUtils() {} // not instantiable + + /** + * Checks to see if the specified file system is available + * + * @param fs + * @return true if the specified file system is available. + */ + public static boolean isFileSystemAvailable(FileSystem fs) { + boolean available = false; + if (fs instanceof DistributedFileSystem) { + try { + if (((DistributedFileSystem) fs).getDataNodeStats().length > 0) { + available = true; + + } else { + LOG.fatal("file system unavailable: no data nodes"); + } + + } catch (IOException e) { + LOG.fatal("file system unavailable because: ", e); + } + + } else { + available = true; + } + return available; + } +} Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java?rev=573777&r1=573776&r2=573777&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/MiniHBaseCluster.java Fri Sep 7 20:09:42 2007 @@ -310,6 +310,32 @@ } /** + * Wait for Mini HBase Cluster to shut down. + */ + public void join() { + if (regionThreads != null) { + synchronized(regionThreads) { + for(Thread t: regionThreads) { + if (t.isAlive()) { + try { + t.join(); + } catch (InterruptedException e) { + // continue + } + } + } + } + } + if (masterThread != null && masterThread.isAlive()) { + try { + masterThread.join(); + } catch(InterruptedException e) { + // continue + } + } + } + + /** * Shut down HBase cluster started by calling * {@link #startMaster(Configuration)} and then * {@link #startRegionServers(Configuration, int)}; Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java?rev=573777&r1=573776&r2=573777&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/StaticTestEnvironment.java Fri Sep 7 20:09:42 2007 @@ -61,31 +61,32 @@ value = System.getenv("DEBUGGING"); if(value != null && value.equalsIgnoreCase("TRUE")) { debugging = true; + } - Logger rootLogger = Logger.getRootLogger(); - // rootLogger.setLevel(Level.WARN); + Logger rootLogger = Logger.getRootLogger(); + rootLogger.setLevel(Level.WARN); - Level logLevel = Level.INFO; - value = System.getenv("LOGGING_LEVEL"); - if(value != null && value.length() != 0) { - if(value.equalsIgnoreCase("ALL")) { - logLevel = Level.ALL; - } else if(value.equalsIgnoreCase("DEBUG")) { - logLevel = Level.DEBUG; - } else if(value.equalsIgnoreCase("ERROR")) { - logLevel = Level.ERROR; - } else if(value.equalsIgnoreCase("FATAL")) { - logLevel = Level.FATAL; - } else if(value.equalsIgnoreCase("INFO")) { - logLevel = Level.INFO; - } else if(value.equalsIgnoreCase("OFF")) { - logLevel = Level.OFF; - } else if(value.equalsIgnoreCase("TRACE")) { - logLevel = Level.TRACE; - } else if(value.equalsIgnoreCase("WARN")) { - logLevel = Level.WARN; - } + Level logLevel = Level.DEBUG; + value = System.getenv("LOGGING_LEVEL"); + if(value != null && value.length() != 0) { + if(value.equalsIgnoreCase("ALL")) { + logLevel = Level.ALL; + } else if(value.equalsIgnoreCase("DEBUG")) { + logLevel = Level.DEBUG; + } else if(value.equalsIgnoreCase("ERROR")) { + logLevel = Level.ERROR; + } else if(value.equalsIgnoreCase("FATAL")) { + logLevel = Level.FATAL; + } else if(value.equalsIgnoreCase("INFO")) { + logLevel = Level.INFO; + } else if(value.equalsIgnoreCase("OFF")) { + logLevel = Level.OFF; + } else if(value.equalsIgnoreCase("TRACE")) { + logLevel = Level.TRACE; + } else if(value.equalsIgnoreCase("WARN")) { + logLevel = Level.WARN; } + ConsoleAppender consoleAppender = null; for(Enumeration e = rootLogger.getAllAppenders(); e.hasMoreElements();) { @@ -103,8 +104,8 @@ consoleLayout.setConversionPattern("%d %-5p [%t] %l: %m%n"); } } - Logger.getLogger( - HBaseTestCase.class.getPackage().getName()).setLevel(logLevel); } + Logger.getLogger( + HBaseTestCase.class.getPackage().getName()).setLevel(logLevel); } } Added: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java?rev=573777&view=auto ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java (added) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/TestDFSAbort.java Fri Sep 7 20:09:42 2007 @@ -0,0 +1,66 @@ +/** + * Copyright 2007 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import org.apache.log4j.Level; +import org.apache.log4j.Logger; + +/** + * Test ability of HBase to handle DFS failure + */ +public class TestDFSAbort extends HBaseClusterTestCase { + + /** constructor */ + public TestDFSAbort() { + super(); + conf.setInt("ipc.client.timeout", 5000); // reduce ipc client timeout + conf.setInt("ipc.client.connect.max.retries", 5); // and number of retries + conf.setInt("hbase.client.retries.number", 5); // reduce HBase retries + Logger.getRootLogger().setLevel(Level.WARN); + Logger.getLogger(this.getClass().getPackage().getName()).setLevel(Level.DEBUG); + } + + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + super.setUp(); + + HTableDescriptor desc = new HTableDescriptor(getName()); + desc.addFamily(new HColumnDescriptor(HConstants.COLUMN_FAMILY_STR)); + + HBaseAdmin admin = new HBaseAdmin(conf); + admin.createTable(desc); + } + + /** + * @throws Exception + */ + public void testDFSAbort() throws Exception { + + // By now the Mini DFS is running, Mini HBase is running and we have + // created a table. Now let's yank the rug out from HBase + + cluster.getDFSCluster().shutdown(); + + // Now wait for Mini HBase Cluster to shut down + + cluster.join(); + } +}