hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [2/4] hbase git commit: HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin
Date Fri, 15 May 2015 18:15:17 GMT
HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6968834c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6968834c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6968834c

Branch: refs/heads/branch-1
Commit: 6968834c9c96c103e3a87f1be0dace49f2c9461e
Parents: 41aceca
Author: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Authored: Fri May 15 19:04:22 2015 +0100
Committer: Matteo Bertozzi <matteo.bertozzi@cloudera.com>
Committed: Fri May 15 19:04:22 2015 +0100

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      | 114 ++++++---
 .../hbase/regionserver/StoreFileScanner.java    |   9 +
 .../TestCorruptedRegionStoreFile.java           | 249 +++++++++++++++++++
 3 files changed, 333 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 66c09a8..1bb0865 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -1108,7 +1108,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
   public long getNumMutationsWithoutWAL() {
     return numMutationsWithoutWAL.get();
   }
-  
+
   @Override
   public long getDataInMemoryWithoutWAL() {
     return dataInMemoryWithoutWAL.get();
@@ -2365,7 +2365,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
     LOG.info(msg);
     status.setStatus(msg);
 
-    return new FlushResultImpl(compactionRequested ? 
+    return new FlushResultImpl(compactionRequested ?
         FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
           FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED,
         flushOpSeqId);
@@ -4721,7 +4721,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
 
   @Override
   public boolean refreshStoreFiles() throws IOException {
-    if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) {
+    return refreshStoreFiles(false);
+  }
+
+  protected boolean refreshStoreFiles(boolean force) throws IOException {
+    if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo()))
{
       return false; // if primary nothing to do
     }
 
@@ -5200,14 +5204,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
      * If the joined heap data gathering is interrupted due to scan limits, this will
      * contain the row for which we are populating the values.*/
     protected Cell joinedContinuationRow = null;
+    private boolean filterClosed = false;
+
+    protected final int isScan;
     protected final byte[] stopRow;
+    protected final HRegion region;
+
+    private final long readPt;
+    private final long maxResultSize;
+    private final ScannerContext defaultScannerContext;
     private final FilterWrapper filter;
-    private ScannerContext defaultScannerContext;
-    protected int isScan;
-    private boolean filterClosed = false;
-    private long readPt;
-    private long maxResultSize;
-    protected HRegion region;
 
     @Override
     public HRegionInfo getRegionInfo() {
@@ -5216,7 +5222,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
 
     RegionScannerImpl(Scan scan, List<KeyValueScanner> additionalScanners, HRegion
region)
         throws IOException {
-
       this.region = region;
       this.maxResultSize = scan.getMaxResultSize();
       if (scan.hasFilter()) {
@@ -5257,10 +5262,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         scanners.addAll(additionalScanners);
       }
 
-      for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
-          scan.getFamilyMap().entrySet()) {
+      for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet())
{
         Store store = stores.get(entry.getKey());
-        KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+        KeyValueScanner scanner;
+        try {
+          scanner = store.getScanner(scan, entry.getValue(), this.readPt);
+        } catch (FileNotFoundException e) {
+          throw handleFileNotFound(e);
+        }
         if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
           || this.filter.isFamilyEssential(entry.getKey())) {
           scanners.add(scanner);
@@ -5352,7 +5361,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
         moreValues = nextInternal(tmpList, scannerContext);
         outResults.addAll(tmpList);
       }
-      
+
       // If the size limit was reached it means a partial Result is being returned. Returning
a
       // partial Result means that we should not reset the filters; filters should only be
reset in
       // between rows
@@ -5403,30 +5412,33 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       boolean tmpKeepProgress = scannerContext.getKeepProgress();
       // Scanning between column families and thus the scope is between cells
       LimitScope limitScope = LimitScope.BETWEEN_CELLS;
-      do {
-        // We want to maintain any progress that is made towards the limits while scanning
across
-        // different column families. To do this, we toggle the keep progress flag on during
calls
-        // to the StoreScanner to ensure that any progress made thus far is not wiped away.
-        scannerContext.setKeepProgress(true);
-        heap.next(results, scannerContext);
-        scannerContext.setKeepProgress(tmpKeepProgress);
-
-        nextKv = heap.peek();
-        moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
-
-        if (scannerContext.checkBatchLimit(limitScope)) {
-          return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
-        } else if (scannerContext.checkSizeLimit(limitScope)) {
-          ScannerContext.NextState state =
-              moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
-          return scannerContext.setScannerState(state).hasMoreValues();
-        } else if (scannerContext.checkTimeLimit(limitScope)) {
-          ScannerContext.NextState state =
-              moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
-          return scannerContext.setScannerState(state).hasMoreValues();
-        }
-      } while (moreCellsInRow);
-
+      try {
+        do {
+          // We want to maintain any progress that is made towards the limits while scanning
across
+          // different column families. To do this, we toggle the keep progress flag on during
calls
+          // to the StoreScanner to ensure that any progress made thus far is not wiped away.
+          scannerContext.setKeepProgress(true);
+          heap.next(results, scannerContext);
+          scannerContext.setKeepProgress(tmpKeepProgress);
+
+          nextKv = heap.peek();
+          moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length);
+
+          if (scannerContext.checkBatchLimit(limitScope)) {
+            return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues();
+          } else if (scannerContext.checkSizeLimit(limitScope)) {
+            ScannerContext.NextState state =
+                moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED;
+            return scannerContext.setScannerState(state).hasMoreValues();
+          } else if (scannerContext.checkTimeLimit(limitScope)) {
+            ScannerContext.NextState state =
+                moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED;
+            return scannerContext.setScannerState(state).hasMoreValues();
+          }
+        } while (moreCellsInRow);
+      } catch (FileNotFoundException e) {
+        throw handleFileNotFound(e);
+      }
       return nextKv != null;
     }
 
@@ -5753,18 +5765,42 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver,
Regi
       }
       boolean result = false;
       startRegionOperation();
+      KeyValue kv = KeyValueUtil.createFirstOnRow(row);
       try {
-        KeyValue kv = KeyValueUtil.createFirstOnRow(row);
         // use request seek to make use of the lazy seek option. See HBASE-5520
         result = this.storeHeap.requestSeek(kv, true, true);
         if (this.joinedHeap != null) {
           result = this.joinedHeap.requestSeek(kv, true, true) || result;
         }
+      } catch (FileNotFoundException e) {
+        throw handleFileNotFound(e);
       } finally {
         closeRegionOperation();
       }
       return result;
     }
+
+    private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException
{
+      // tries to refresh the store files, otherwise shutdown the RS.
+      // TODO: add support for abort() of a single region and trigger reassignment.
+      try {
+        region.refreshStoreFiles(true);
+        return new IOException("unable to read store file");
+      } catch (IOException e) {
+        String msg = "a store file got lost: " + fnfe.getMessage();
+        LOG.error(msg);
+        LOG.error("unable to refresh store files", e);
+        abortRegionServer(msg);
+        return new NotServingRegionException(getRegionInfo().getRegionNameAsString() +" is
closing");
+      }
+    }
+
+    private void abortRegionServer(String msg) throws IOException {
+      if (rsServices instanceof HRegionServer) {
+        ((HRegionServer)rsServices).abort(msg);
+      }
+      throw new UnsupportedOperationException("not able to abort RS after: " + msg);
+    }
   }
 
   // Utility methods

http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index c27b455..961352d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -142,6 +143,8 @@ public class StoreFileScanner implements KeyValueScanner {
           skipKVsNewerThanReadpoint();
         }
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch(IOException e) {
       throw new IOException("Could not iterate " + this, e);
     }
@@ -168,6 +171,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not seek " + this + " to key " + key, ioe);
     }
@@ -192,6 +197,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not reseek " + this + " to key " + key,
           ioe);
@@ -453,6 +460,8 @@ public class StoreFileScanner implements KeyValueScanner {
       } finally {
         realSeekDone = true;
       }
+    } catch (FileNotFoundException e) {
+      throw e;
     } catch (IOException ioe) {
       throw new IOException("Could not seekToPreviousRow " + this + " to key "
           + key, ioe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6968834c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
new file mode 100644
index 0000000..dce19d6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.TestTableName;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+@Category(LargeTests.class)
+public class TestCorruptedRegionStoreFile {
+  private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final String FAMILY_NAME_STR = "f";
+  private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR);
+
+  private static final int NUM_FILES = 25;
+  private static final int ROW_PER_FILE = 2000;
+  private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE;
+
+  @Rule public TestTableName TEST_TABLE = new TestTableName();
+
+  private final ArrayList<Path> storeFiles = new ArrayList<Path>();
+  private Path tableDir;
+  private int rowCount;
+
+  private static void setupConf(Configuration conf) {
+    conf.setLong("hbase.hstore.compaction.min", 20);
+    conf.setLong("hbase.hstore.compaction.max", 39);
+    conf.setLong("hbase.hstore.blockingStoreFiles", 40);
+  }
+
+  private void setupTable(final TableName tableName) throws IOException {
+    // load the table
+    Table table = UTIL.createTable(tableName, FAMILY_NAME);
+    try {
+      rowCount = 0;
+      byte[] value = new byte[1024];
+      byte[] q = Bytes.toBytes("q");
+      while (rowCount < NUM_ROWS) {
+        Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount)));
+        put.setDurability(Durability.SKIP_WAL);
+        put.add(FAMILY_NAME, q, value);
+        table.put(put);
+
+        if ((rowCount++ % ROW_PER_FILE) == 0) {
+          // flush it
+          UTIL.getHBaseAdmin().flush(tableName);
+        }
+      }
+    } finally {
+      UTIL.getHBaseAdmin().flush(tableName);
+      table.close();
+    }
+
+    assertEquals(NUM_ROWS, rowCount);
+
+    // get the store file paths
+    storeFiles.clear();
+    tableDir = FSUtils.getTableDir(getRootDir(), tableName);
+    FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor()
{
+      @Override
+      public void storeFile(final String region, final String family, final String hfile)
+          throws IOException {
+        HFileLink link = HFileLink.build(UTIL.getConfiguration(), tableName, region, family,
hfile);
+        storeFiles.add(link.getOriginPath());
+      }
+    });
+    assertTrue("expected at least 1 store file", storeFiles.size() > 0);
+    LOG.info("store-files: " + storeFiles);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    setupConf(UTIL.getConfiguration());
+    UTIL.startMiniCluster(2, 3);
+
+    setupTable(TEST_TABLE.getTableName());
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    try {
+      UTIL.shutdownMiniCluster();
+    } catch (Exception e) {
+      LOG.warn("failure shutting down cluster", e);
+    }
+  }
+
+  @Test(timeout=180000)
+  public void testLosingFileDuringScan() throws Exception {
+    assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScanNext(Table table) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+               count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  @Test(timeout=180000)
+  public void testLosingFileAfterScannerInit() throws Exception {
+    assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName()));
+
+    final FileSystem fs = getFileSystem();
+    final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile");
+
+    // try to query with the missing file
+    int count = fullScanAndCount(TEST_TABLE.getTableName(), new ScanInjector() {
+      private boolean hasFile = true;
+
+      @Override
+      public void beforeScan(Table table, Scan scan) throws Exception {
+        // move the path away (now the region is corrupted)
+        if (hasFile) {
+          fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath);
+          LOG.info("Move file to local");
+          evictHFileCache(storeFiles.get(0));
+          hasFile = false;
+        }
+      }
+    });
+    assertTrue("expected one file lost: rowCount=" + count + " lostRows=" + (NUM_ROWS - count),
+               count >= (NUM_ROWS - ROW_PER_FILE));
+  }
+
+  // ==========================================================================
+  //  Helpers
+  // ==========================================================================
+  private FileSystem getFileSystem() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem();
+  }
+
+  private Path getRootDir() {
+    return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir();
+  }
+
+  private void evictHFileCache(final Path hfile) throws Exception {
+    for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
+      HRegionServer rs = rst.getRegionServer();
+      rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName());
+    }
+    Thread.sleep(6000);
+  }
+
+  private int fullScanAndCount(final TableName tableName) throws Exception {
+    return fullScanAndCount(tableName, new ScanInjector());
+  }
+
+  private int fullScanAndCount(final TableName tableName, final ScanInjector injector)
+      throws Exception {
+    Table table = UTIL.getConnection().getTable(tableName);
+    int count = 0;
+    try {
+      Scan scan = new Scan();
+      scan.setCaching(1);
+      scan.setCacheBlocks(false);
+      injector.beforeScan(table, scan);
+      ResultScanner scanner = table.getScanner(scan);
+      try {
+        while (true) {
+          injector.beforeScanNext(table);
+          Result result = scanner.next();
+          injector.afterScanNext(table, result);
+          if (result == null) break;
+          if ((count++ % (ROW_PER_FILE / 2)) == 0) {
+            LOG.debug("scan next " + count);
+          }
+        }
+      } finally {
+        scanner.close();
+        injector.afterScan(table);
+      }
+    } finally {
+      table.close();
+    }
+    return count;
+  }
+
+  private class ScanInjector {
+    protected void beforeScan(Table table, Scan scan) throws Exception {}
+    protected void beforeScanNext(Table table) throws Exception {}
+    protected void afterScanNext(Table table, Result result) throws Exception {}
+    protected void afterScan(Table table) throws Exception {}
+  }
+}


Mime
View raw message