hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r798679 - in /hadoop/hbase/trunk: ./ src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/ src/java/org/apache/hadoop/hbase/ src/java/org/apache/hadoop/hbase/client/ src/java/org/apache/hadoop/hbase/io/hfile/ src/...
Date Tue, 28 Jul 2009 19:41:55 GMT
Author: stack
Date: Tue Jul 28 19:41:54 2009
New Revision: 798679

URL: http://svn.apache.org/viewvc?rev=798679&view=rev
Log:
HBASE-1671 HBASE-1609 broke scanners riding across splits

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
    hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Tue Jul 28 19:41:54 2009
@@ -295,6 +295,7 @@
                (Tim Sell and Ryan Rawson via Stack)
    HBASE-1703  ICVs across /during a flush can cause multiple keys with the 
                same TS (bad)
+   HBASE-1671  HBASE-1609 broke scanners riding across splits
 
   IMPROVEMENTS
    HBASE-1089  Add count of regions on filesystem to master UI; add percentage

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
(original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
Tue Jul 28 19:41:54 2009
@@ -31,9 +31,8 @@
   private TransactionState transactionState;
 
   TransactionScannerCallable(final TransactionState transactionState,
-      final HConnection connection, final byte[] tableName,
-      final byte[] startRow, Scan scan) {
-    super(connection, tableName,  startRow, scan);
+      final HConnection connection, final byte[] tableName, Scan scan) {
+    super(connection, tableName,  scan);
     this.transactionState = transactionState;
   }
 

Modified: hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
(original)
+++ hadoop/hbase/trunk/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
Tue Jul 28 19:41:54 2009
@@ -186,7 +186,7 @@
         final byte[] localStartKey, int caching) {
       TransactionScannerCallable t = 
           new TransactionScannerCallable(transactionState, getConnection(),
-          getTableName(), getScan().getStartRow(), getScan());
+          getTableName(), getScan());
       t.setCaching(caching);
       return t;
     }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/DoNotRetryIOException.java Tue Jul
28 19:41:54 2009
@@ -42,4 +42,12 @@
   public DoNotRetryIOException(String message) {
     super(message);
   }
+
+  /**
+   * @param message
+   * @param cause
+   */
+  public DoNotRetryIOException(String message, Throwable cause) {
+    super(message, cause);
+  }
 }
\ No newline at end of file

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HConnectionManager.java Tue
Jul 28 19:41:54 2009
@@ -65,8 +65,6 @@
  * Used by {@link HTable} and {@link HBaseAdmin}
  */
 public class HConnectionManager implements HConstants {
-  private static final Log LOG = LogFactory.getLog(HConnectionManager.class);
-
   /*
    * Not instantiable.
    */
@@ -94,7 +92,6 @@
       if (connection == null) {
         connection = new TableServers(conf);
         HBASE_INSTANCES.put(conf, connection);
-        LOG.debug("Created new HBASE_INSTANCES");
       }
     }
     return connection;
@@ -131,7 +128,7 @@
 
   /* Encapsulates finding the servers for an HBase instance */
   private static class TableServers implements ServerConnection, HConstants, Watcher {
-    private static final Log LOG = LogFactory.getLog(TableServers.class);
+    static final Log LOG = LogFactory.getLog(TableServers.class);
     private final Class<? extends HRegionInterface> serverInterfaceClass;
     private final long pause;
     private final int numRetries;
@@ -353,8 +350,7 @@
       MetaScannerVisitor visitor = new MetaScannerVisitor() {
         public boolean processRow(Result result) throws IOException {
           try {
-            byte[] value =
-              result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
+            byte[] value = result.getValue(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
             HRegionInfo info = null;
             if (value != null) {
               info = Writables.getHRegionInfo(value);
@@ -411,9 +407,7 @@
       scan.addColumn(CATALOG_FAMILY, REGIONINFO_QUALIFIER);
       ScannerCallable s = new ScannerCallable(this, 
           (Bytes.equals(tableName, HConstants.META_TABLE_NAME) ?
-              HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME),
-          scan.getStartRow(),
-           scan);
+              HConstants.ROOT_TABLE_NAME : HConstants.META_TABLE_NAME), scan);
       try {
         // Open scanner
         getRegionServerWithRetries(s);
@@ -542,7 +536,7 @@
       */
     private HRegionLocation locateRegionInMeta(final byte [] parentTable,
       final byte [] tableName, final byte [] row, boolean useCache)
-    throws IOException{
+    throws IOException {
       HRegionLocation location = null;
       // If supposed to be using the cache, then check it for a possible hit.
       // Otherwise, delete any existing cached location so it won't interfere.
@@ -969,7 +963,7 @@
           throw (DoNotRetryIOException) t;
         }
       }
-      return null;    
+      return null;
     }
 
     private HRegionLocation

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/HTable.java Tue Jul 28 19:41:54
2009
@@ -29,6 +29,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -36,6 +37,7 @@
 import org.apache.hadoop.hbase.HServerAddress;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.filter.RowFilterInterface;
@@ -54,7 +56,6 @@
 /**
  * Used to communicate with a single HBase table
  * TODO: checkAndSave in oldAPI
- * TODO: Converting filters
  * TODO: Regex deletes.
  */
 public class HTable {
@@ -1778,13 +1779,18 @@
    */
   protected class ClientScanner implements ResultScanner {
     private final Log CLIENT_LOG = LogFactory.getLog(this.getClass());
+    // HEADSUP: The scan internal start row can change as we move through table.
     private Scan scan;
     private boolean closed = false;
+    // Current region scanner is against.  Gets cleared if current region goes
+    // wonky: e.g. if it splits on us.
     private HRegionInfo currentRegion = null;
     private ScannerCallable callable = null;
     private final LinkedList<Result> cache = new LinkedList<Result>();
-    private final int scannerCaching = HTable.this.scannerCaching;
+    private final int caching = HTable.this.scannerCaching;
     private long lastNext;
+    // Keep lastResult returned successfully in case we have to reset scanner.
+    private Result lastResult = null;
     
     protected ClientScanner(final Scan scan) {
       if (CLIENT_LOG.isDebugEnabled()) {
@@ -1804,7 +1810,7 @@
     }
 
     public void initialize() throws IOException {
-      nextScanner(this.scannerCaching);
+      nextScanner(this.caching);
     }
 
     protected Scan getScan() {
@@ -1814,10 +1820,12 @@
     protected long getTimestamp() {
       return lastNext;
     }
-    
+
     /*
-     * Gets a scanner for the next region.
-     * Returns false if there are no more scanners.
+     * Gets a scanner for the next region.  If this.currentRegion != null, then
+     * we will move to the endrow of this.currentRegion.  Else we will get
+     * scanner at the scan.getStartRow().
+     * @param nbRows
      */
     private boolean nextScanner(int nbRows) throws IOException {
       // Close the previous scanner if it's open
@@ -1826,38 +1834,38 @@
         getConnection().getRegionServerWithRetries(callable);
         this.callable = null;
       }
-
+      
+      // Where to start the next scanner
+      byte [] localStartKey = null;
+      
       // if we're at the end of the table, then close and return false
       // to stop iterating
-      if (currentRegion != null) {
+      if (this.currentRegion != null) {
         if (CLIENT_LOG.isDebugEnabled()) {
-          CLIENT_LOG.debug("Advancing forward from region " + currentRegion);
+          CLIENT_LOG.debug("Finished with region " + this.currentRegion);
         }
-
-        byte [] endKey = currentRegion.getEndKey();
+        byte [] endKey = this.currentRegion.getEndKey();
         if (endKey == null ||
             Bytes.equals(endKey, HConstants.EMPTY_BYTE_ARRAY) ||
             filterSaysStop(endKey)) {
           close();
           return false;
         }
-      } 
-
-      HRegionInfo oldRegion = this.currentRegion;
-      byte [] localStartKey = 
-        oldRegion == null ? scan.getStartRow() : oldRegion.getEndKey();
+        localStartKey = endKey;
+      } else {
+        localStartKey = this.scan.getStartRow();
+      }
 
       if (CLIENT_LOG.isDebugEnabled()) {
         CLIENT_LOG.debug("Advancing internal scanner to startKey at '" +
           Bytes.toStringBinary(localStartKey) + "'");
-      }
-            
+      }            
       try {
         callable = getScannerCallable(localStartKey, nbRows);
-        // open a scanner on the region server starting at the 
+        // Open a scanner on the region server starting at the 
         // beginning of the region
         getConnection().getRegionServerWithRetries(callable);
-        currentRegion = callable.getHRegionInfo();
+        this.currentRegion = callable.getHRegionInfo();
       } catch (IOException e) {
         close();
         throw e;
@@ -1867,8 +1875,9 @@
     
     protected ScannerCallable getScannerCallable(byte [] localStartKey,
         int nbRows) {
+      scan.setStartRow(localStartKey);
       ScannerCallable s = new ScannerCallable(getConnection(), 
-          getTableName(), localStartKey, scan);
+        getTableName(), scan);
       s.setCaching(nbRows);
       return s;
     }
@@ -1915,13 +1924,35 @@
       }
       if (cache.size() == 0) {
         Result [] values = null;
-        int countdown = this.scannerCaching;
+        int countdown = this.caching;
         // We need to reset it if it's a new callable that was created 
         // with a countdown in nextScanner
-        callable.setCaching(this.scannerCaching);
+        callable.setCaching(this.caching);
+        // This flag is set when we want to skip the result returned.  We do
+        // this when we reset scanner because it split under us.
+        boolean skipFirst = false;
         do {
           try {
             values = getConnection().getRegionServerWithRetries(callable);
+            if (skipFirst) {
+              skipFirst = false;
+              // Reget.
+              values = getConnection().getRegionServerWithRetries(callable);
+            }
+          } catch (DoNotRetryIOException e) {
+            Throwable cause = e.getCause();
+            if (cause == null || !(cause instanceof NotServingRegionException)) {
+              throw e;
+            }
+            // Else, its signal from depths of ScannerCallable that we got an
+            // NSRE on a next and that we need to reset the scanner.
+            this.scan.setStartRow(this.lastResult.getRow());
+            // Clear region as flag to nextScanner to use this.scan.startRow.
+            this.currentRegion = null;
+            // Skip first row returned.  We already let it out on previous
+            // invocation.
+            skipFirst = true;
+            continue;
           } catch (IOException e) {
             if (e instanceof UnknownScannerException &&
                 lastNext + scannerTimeout < System.currentTimeMillis()) {
@@ -1936,6 +1967,7 @@
             for (Result rs : values) {
               cache.add(rs);
               countdown--;
+              this.lastResult = rs;
             }
           }
         } while (countdown > 0 && nextScanner(countdown));
@@ -1965,7 +1997,7 @@
       }
       return resultSets.toArray(new Result[resultSets.size()]);
     }
-    
+
     public void close() {
       if (callable != null) {
         callable.setClose();
@@ -1998,7 +2030,7 @@
               return next != null;
             } catch (IOException e) {
               throw new RuntimeException(e);
-            }            
+            }
           }
           return true;
         }

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/MetaScanner.java Tue Jul 28
19:41:54 2009
@@ -49,7 +49,7 @@
     ScannerCallable callable = null;
     do {
       Scan scan = new Scan(startRow).addFamily(CATALOG_FAMILY);
-      callable = new ScannerCallable(connection, META_TABLE_NAME, scan.getStartRow(), scan);
+      callable = new ScannerCallable(connection, META_TABLE_NAME, scan);
       // Open scanner
       connection.getRegionServerWithRetries(callable);
       try {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/client/ScannerCallable.java Tue Jul
28 19:41:54 2009
@@ -1,3 +1,4 @@
+
 /**
  * Copyright 2008 The Apache Software Foundation
  *
@@ -22,7 +23,12 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.ipc.RemoteException;
+import org.mortbay.log.Log;
 
 
 /**
@@ -34,20 +40,16 @@
   private boolean instantiated = false;
   private boolean closed = false;
   private Scan scan;
-  private byte [] startRow;
   private int caching = 1;
 
   /**
    * @param connection
    * @param tableName
-   * @param startRow
    * @param scan
    */
-  public ScannerCallable (HConnection connection, byte [] tableName,
-      byte [] startRow, Scan scan) {
-    super(connection, tableName, startRow);
+  public ScannerCallable (HConnection connection, byte [] tableName, Scan scan) {
+    super(connection, tableName, scan.getStartRow());
     this.scan = scan;
-    this.startRow = startRow;
   }
   
   /**
@@ -67,18 +69,42 @@
    */
   public Result [] call() throws IOException {
     if (scannerId != -1L && closed) {
-      server.close(scannerId);
-      scannerId = -1L;
+      close();
     } else if (scannerId == -1L && !closed) {
-      // open the scanner
-      scannerId = openScanner();
+      this.scannerId = openScanner();
     } else {
-      Result [] rrs = server.next(scannerId, caching);
+      Result [] rrs = null;
+      try {
+        rrs = server.next(scannerId, caching);
+      } catch (IOException e) {
+    	IOException ioe = null;
+        if (e instanceof RemoteException) {
+          ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
+        }
+        if (ioe != null && ioe instanceof NotServingRegionException) {
+          // Throw a DNRE so that we break out of cycle of calling NSRE
+          // when what we need is to open scanner against new location.
+          // Attach NSRE to signal client that it needs to resetup scanner.
+          throw new DoNotRetryIOException("Reset scanner", ioe);
+        }
+      }
       return rrs == null || rrs.length == 0? null: rrs;
     }
     return null;
   }
   
+  private void close() {
+	if (this.scannerId == -1L) {
+	  return;
+	}
+	try {
+		this.server.close(this.scannerId);
+	} catch (IOException e) {
+		Log.warn("Ignore, probably already closed", e);
+	}
+	this.scannerId = -1L;
+  }
+
   protected long openScanner() throws IOException {
     return server.openScanner(
         this.location.getRegionInfo().getRegionName(), scan);

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Jul 28 19:41:54
2009
@@ -729,7 +729,7 @@
     }
 
     protected String toStringLastKey() {
-      return KeyValue.keyToString(getFirstKey());
+      return KeyValue.keyToString(getLastKey());
     }
 
     public long length() {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HLog.java Tue Jul 28
19:41:54 2009
@@ -188,7 +188,7 @@
    * @param listener
    * @throws IOException
    */
-  public HLog(final FileSystem fs, final Path dir, final Configuration conf,
+  public HLog(final FileSystem fs, final Path dir, final HBaseConfiguration conf,
     final LogRollListener listener)
   throws IOException {
     super();
@@ -219,7 +219,7 @@
       ", optionallogflushinternal=" + this.optionalFlushInterval + "ms");
     rollWriter();
     // Test if syncfs is available.
-    this.append = conf.getBoolean("dfs.support.append", false);
+    this.append = isAppend(conf);
     Method m = null;
     if (this.append) {
       try {
@@ -784,7 +784,7 @@
    * @throws IOException
    */
   public static List<Path> splitLog(final Path rootDir, final Path srcDir,
-      final FileSystem fs, final Configuration conf)
+      final FileSystem fs, final HBaseConfiguration conf)
   throws IOException {
     long millis = System.currentTimeMillis();
     List<Path> splits = null;
@@ -833,7 +833,8 @@
    * @return List of splits made.
    */
   private static List<Path> splitLog(final Path rootDir,
-    final FileStatus [] logfiles, final FileSystem fs, final Configuration conf)
+    final FileStatus [] logfiles, final FileSystem fs,
+    final HBaseConfiguration conf)
   throws IOException {
     final Map<byte [], WriterAndPath> logWriters =
       new TreeMap<byte [], WriterAndPath>(Bytes.BYTES_COMPARATOR);
@@ -848,11 +849,12 @@
     // More means faster but bigger mem consumption  */
     int concurrentLogReads =
       conf.getInt("hbase.regionserver.hlog.splitlog.reader.threads", 3);
-
+    // Is append supported?
+    boolean append = isAppend(conf);
     try {
       int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) / 
           concurrentLogReads)).intValue();
-      for(int step = 0; step < maxSteps; step++) {
+      for (int step = 0; step < maxSteps; step++) {
         final Map<byte[], LinkedList<HLogEntry>> logEntries = 
           new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
         // Stop at logfiles.length when it's the last step
@@ -867,7 +869,6 @@
             LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
               ": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
           }
-          boolean append = conf.getBoolean("dfs.support.append", false);
           recoverLog(fs, logfiles[i].getPath(), append);
           SequenceFile.Reader in = null;
           int count = 0;
@@ -1022,6 +1023,24 @@
   }
 
   /**
+   * @param conf
+   * @return True if append enabled and we have the syncFs in our path.
+   */
+  private static boolean isAppend(final HBaseConfiguration conf) {
+      boolean append = conf.getBoolean("dfs.support.append", false);
+      if (append) {
+        try {
+          SequenceFile.Writer.class.getMethod("syncFs", new Class<?> []{});
+          append = true;
+        } catch (SecurityException e) {
+        } catch (NoSuchMethodException e) {
+          append = false;
+        }
+      }
+      return append;
+    }
+
+  /**
    * Utility class that lets us keep track of the edit with it's key
    * Only used when splitting logs
    */
@@ -1158,10 +1177,9 @@
         System.exit(-1);
       }
     }
-    Configuration conf = new HBaseConfiguration();
+    HBaseConfiguration conf = new HBaseConfiguration();
     FileSystem fs = FileSystem.get(conf);
     Path baseDir = new Path(conf.get(HBASE_DIR));
-
     for (int i = 1; i < args.length; i++) {
       Path logPath = new Path(args[i]);
       if (!fs.exists(logPath)) {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Jul
28 19:41:54 2009
@@ -110,7 +110,7 @@
    * master as a region to close if the carrying regionserver is overloaded.
    * Once set, it is never cleared.
    */
-  private final AtomicBoolean closing = new AtomicBoolean(false);
+  final AtomicBoolean closing = new AtomicBoolean(false);
   private final RegionHistorian historian;
 
   //////////////////////////////////////////////////////////////////////////////
@@ -1671,8 +1671,6 @@
     return this.basedir;
   }
 
-  
-  //TODO
   /**
    * RegionScanner is an iterator through a bunch of rows in an HRegion.
    * <p>
@@ -1710,9 +1708,15 @@
      * Get the next row of results from this region.
      * @param results list to append results to
      * @return true if there are more rows, false if scanner is done
+     * @throws NotServerRegionException If this region is closing or closed
      */
     public boolean next(List<KeyValue> results)
     throws IOException {
+      if (closing.get() || closed.get()) {
+        close();
+        throw new NotServingRegionException(regionInfo.getRegionNameAsString() +
+          " is closing=" + closing.get() + " or closed=" + closed.get());
+      }
       // This method should probably be reorganized a bit... has gotten messy
       KeyValue kv = this.storeHeap.peek();
       if (kv == null) {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue
Jul 28 19:41:54 2009
@@ -803,7 +803,7 @@
    */
   private Throwable cleanup(final Throwable t, final String msg) {
     if (msg == null) {
-      LOG.error(RemoteExceptionHandler.checkThrowable(t));
+      LOG.error("", RemoteExceptionHandler.checkThrowable(t));
     } else {
       LOG.error(msg, RemoteExceptionHandler.checkThrowable(t));
     }
@@ -1890,7 +1890,7 @@
   public Result [] next(final long scannerId, int nbRows) throws IOException {
     try {
       String scannerName = String.valueOf(scannerId);
-      InternalScanner s = scanners.get(scannerName);
+      InternalScanner s = this.scanners.get(scannerName);
       if (s == null) {
         throw new UnknownScannerException("Name: " + scannerName);
       }
@@ -1918,6 +1918,9 @@
       }
       return results.toArray(new Result[0]);
     } catch (Throwable t) {
+      if (t instanceof NotServingRegionException) {
+        this.scanners.remove(scannerId);
+      }
       throw convertThrowableToIOE(cleanup(t));
     }
   } 
@@ -1978,9 +1981,9 @@
       boolean writeToWAL = true;
       this.cacheFlusher.reclaimMemStoreMemory();
       this.requestCount.incrementAndGet();
-      Integer lock = getLockFromId(delete.getLockId());
+      Integer lid = getLockFromId(delete.getLockId());
       HRegion region = getRegion(regionName);
-      region.delete(delete, lock, writeToWAL);
+      region.delete(delete, lid, writeToWAL);
     } catch(WrongRegionException ex) {
     } catch (NotServingRegionException ex) {
     } catch (Throwable t) {

Modified: hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java?rev=798679&r1=798678&r2=798679&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java (original)
+++ hadoop/hbase/trunk/src/test/org/apache/hadoop/hbase/client/TestForceSplit.java Tue Jul
28 19:41:54 2009
@@ -21,7 +21,10 @@
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HBaseClusterTestCase;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -34,6 +37,7 @@
  * Tests forced splitting of HTable
  */
 public class TestForceSplit extends HBaseClusterTestCase {
+  static final Log LOG = LogFactory.getLog(TestForceSplit.class);
   private static final byte[] tableName = Bytes.toBytes("test");
   private static final byte[] columnName = Bytes.toBytes("a:");
 
@@ -44,7 +48,7 @@
   }
 
   /**
-   * the test
+   * Tests forcing split from client and having scanners successfully ride over split.
    * @throws Exception 
    * @throws IOException
    */
@@ -55,7 +59,7 @@
     htd.addFamily(new HColumnDescriptor(columnName));
     HBaseAdmin admin = new HBaseAdmin(conf);
     admin.createTable(htd);
-    HTable table = new HTable(conf, tableName);
+    final HTable table = new HTable(conf, tableName);
     byte[] k = new byte[3];
     int rowCount = 0;
     for (byte b1 = 'a'; b1 < 'z'; b1++) {
@@ -88,31 +92,50 @@
     scanner.close();
     assertEquals(rowCount, rows);
     
+    // Have an outstanding scan going on to make sure we can scan over splits.
+    scan = new Scan();
+    scanner = table.getScanner(scan);
+    // Scan first row so we are into first region before split happens.
+    scanner.next();
+
+    final AtomicInteger count = new AtomicInteger(0);
+    Thread t = new Thread("CheckForSplit") {
+      public void run() {
+        for (int i = 0; i < 20; i++) {
+          try {
+            sleep(1000);
+          } catch (InterruptedException e) {
+            continue;
+          }
+          // check again    table = new HTable(conf, tableName);
+          Map<HRegionInfo, HServerAddress> regions = null;
+          try {
+            regions = table.getRegionsInfo();
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+          if (regions == null) continue;
+          count.set(regions.size());
+          if (count.get() >= 2) break;
+          LOG.debug("Cycle waiting on split");
+        }
+      }
+    };
+    t.start();
     // tell the master to split the table
     admin.split(Bytes.toString(tableName));
+    t.join();
 
-    // give some time for the split to happen
-    Thread.sleep(15 * 1000);
-
-    // check again    table = new HTable(conf, tableName);
-    m = table.getRegionsInfo();
-    System.out.println("Regions after split (" + m.size() + "): " + m);
-    // should have two regions now
-    assertTrue(m.size() == 2);
-    
     // Verify row count
-    scan = new Scan();
-    scanner = table.getScanner(scan);
-    rows = 0;
-    for(Result result : scanner) {
+    rows = 1; // We counted one row above.
+    for (Result result : scanner) {
       rows++;
-      if(rows > rowCount) {
+      if (rows > rowCount) {
         scanner.close();
-        assertTrue("Have already scanned more rows than expected (" + 
-            rowCount + ")", false);
+        assertTrue("Scanned more than expected (" + rowCount + ")", false);
       }
     }
     scanner.close();
     assertEquals(rowCount, rows);
   }
-}
+}
\ No newline at end of file



Mime
View raw message