Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 51974182C6 for ; Fri, 15 May 2015 18:15:17 +0000 (UTC) Received: (qmail 26774 invoked by uid 500); 15 May 2015 18:15:17 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 26681 invoked by uid 500); 15 May 2015 18:15:17 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 26330 invoked by uid 99); 15 May 2015 18:15:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 15 May 2015 18:15:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CA6DAE35AA; Fri, 15 May 2015 18:15:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mbertozzi@apache.org To: commits@hbase.apache.org Date: Fri, 15 May 2015 18:15:19 -0000 Message-Id: <1055d2ad07734a68af2d5cb0801e24d7@git.apache.org> In-Reply-To: <957ba86b63a447199164d37a5deef062@git.apache.org> References: <957ba86b63a447199164d37a5deef062@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [4/4] hbase git commit: HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin HBASE-13651 Handle StoreFileScanner FileNotFoundExceptin Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca254ed5 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca254ed5 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca254ed5 Branch: refs/heads/0.94 Commit: ca254ed51514d2f14fc1f751778fb82df423aab8 Parents: fb43461 Author: Matteo Bertozzi Authored: Fri May 15 19:09:40 2015 +0100 Committer: Matteo Bertozzi Committed: Fri May 15 19:09:40 2015 +0100 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 37 ++- .../hbase/regionserver/HRegionServer.java | 4 + .../apache/hadoop/hbase/regionserver/Store.java | 1 + .../hbase/regionserver/StoreFileScanner.java | 7 + .../TestCorruptedRegionStoreFile.java | 244 +++++++++++++++++++ 5 files changed, 285 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5da00dc..8f20376 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4024,7 +4024,13 @@ public class HRegion implements HeapSize { // , Writable{ for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner = store.getScanner(scan, entry.getValue()); + KeyValueScanner scanner; + try { + scanner = store.getScanner(scan, entry.getValue()); + } catch (FileNotFoundException e) { + abortRegionServer(e.getMessage()); + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing"); + } if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || FilterBase.isFamilyEssential(this.filter, entry.getKey())) { scanners.add(scanner); @@ -4152,13 +4158,18 @@ public class HRegion implements HeapSize { // , Writable{ private KeyValue populateResult(List results, KeyValueHeap heap, int limit, byte[] currentRow, int offset, short length, String metric) throws IOException { KeyValue nextKv; - do { - heap.next(results, limit - results.size(), metric); - if (limit > 0 && results.size() == limit) { - return KV_LIMIT; - } - nextKv = heap.peek(); - } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length)); + try { + do { + heap.next(results, limit - results.size(), metric); + if (limit > 0 && results.size() == limit) { + return KV_LIMIT; + } + nextKv = heap.peek(); + } while (nextKv != null && nextKv.matchingRow(currentRow, offset, length)); + } catch (FileNotFoundException e) { + abortRegionServer(e.getMessage()); + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing"); + } return nextKv; } @@ -4368,6 +4379,9 @@ public class HRegion implements HeapSize { // , Writable{ if (this.joinedHeap != null) { result = this.joinedHeap.requestSeek(kv, true, true) || result; } + } catch (FileNotFoundException e) { + abortRegionServer(e.getMessage()); + throw new NotServingRegionException(regionInfo.getRegionNameAsString() + " is closing"); } finally { closeRegionOperation(); } @@ -6034,6 +6048,13 @@ public class HRegion implements HeapSize { // , Writable{ lock.readLock().unlock(); } + public void abortRegionServer(String msg) throws IOException { + RegionServerServices rs = getRegionServerServices(); + if (rs instanceof HRegionServer) { + ((HRegionServer)rs).abort(msg); + } + } + /** * This method needs to be called before any public call that reads or * modifies stores in bulk. It has to be called just before a try. http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index eadc9e8..5499135 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3446,6 +3446,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, return conf; } + public CacheConfig getCacheConfig() { + return cacheConfig; + } + /** @return the write lock for the server */ ReentrantReadWriteLock.WriteLock getWriteLock() { return lock.writeLock(); http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 7424788..3bd9f11 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index b055630..1594a0a 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -129,6 +130,8 @@ public class StoreFileScanner implements KeyValueScanner { if (hasMVCCInfo) skipKVsNewerThanReadpoint(); } + } catch (FileNotFoundException e) { + throw e; } catch(IOException e) { throw new IOException("Could not iterate " + this, e); } @@ -151,6 +154,8 @@ public class StoreFileScanner implements KeyValueScanner { } finally { realSeekDone = true; } + } catch (FileNotFoundException e) { + throw e; } catch (IOException ioe) { throw new IOException("Could not seek " + this + " to key " + key, ioe); } @@ -171,6 +176,8 @@ public class StoreFileScanner implements KeyValueScanner { } finally { realSeekDone = true; } + } catch (FileNotFoundException e) { + throw e; } catch (IOException ioe) { throw new IOException("Could not reseek " + this + " to key " + key, ioe); http://git-wip-us.apache.org/repos/asf/hbase/blob/ca254ed5/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java ---------------------------------------------------------------------- diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java new file mode 100644 index 0000000..c4e0a7d --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java @@ -0,0 +1,244 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.HFileLink; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FSVisitor; + +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class TestCorruptedRegionStoreFile { + private static final Log LOG = LogFactory.getLog(TestCorruptedRegionStoreFile.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final String FAMILY_NAME_STR = "f"; + private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR); + + private static final int NUM_FILES = 25; + private static final int ROW_PER_FILE = 2000; + private static final int NUM_ROWS = NUM_FILES * ROW_PER_FILE; + + private final ArrayList storeFiles = new ArrayList(); + private Path tableDir; + private int rowCount; + + private static void setupConf(Configuration conf) { + conf.setLong("hbase.hstore.compaction.min", 20); + conf.setLong("hbase.hstore.compaction.max", 39); + conf.setLong("hbase.hstore.blockingStoreFiles", 40); + } + + private void setupTable(final String tableName) throws Exception { + // load the table + HTable table = UTIL.createTable(Bytes.toBytes(tableName), FAMILY_NAME); + try { + rowCount = 0; + byte[] value = new byte[1024]; + byte[] q = Bytes.toBytes("q"); + while (rowCount < NUM_ROWS) { + Put put = new Put(Bytes.toBytes(String.format("%010d", rowCount))); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_NAME, q, value); + table.put(put); + + if ((rowCount++ % ROW_PER_FILE) == 0) { + // flush it + UTIL.getHBaseAdmin().flush(tableName); + } + } + } finally { + UTIL.getHBaseAdmin().flush(tableName); + table.close(); + } + + assertEquals(NUM_ROWS, rowCount); + + // get the store file paths + storeFiles.clear(); + tableDir = FSUtils.getTablePath(getRootDir(), tableName); + FSVisitor.visitTableStoreFiles(getFileSystem(), tableDir, new FSVisitor.StoreFileVisitor() { + @Override + public void storeFile(final String region, final String family, final String hfile) + throws IOException { + HFileLink link = HFileLink.create(UTIL.getConfiguration(), tableName, region, family, hfile); + storeFiles.add(link.getOriginPath()); + } + }); + assertTrue("expected at least 1 store file", storeFiles.size() > 0); + LOG.info("store-files: " + storeFiles); + } + + @Before + public void setup() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(2, 3); + } + + @After + public void tearDown() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test(timeout=90000) + public void testLosingFileDuringScan() throws Exception { + final String tableName = "testLosingFileDuringScan"; + setupTable(tableName); + assertEquals(rowCount, fullScanAndCount(tableName)); + + final FileSystem fs = getFileSystem(); + final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile"); + + // try to query with the missing file + int count = fullScanAndCount(tableName, new ScanInjector() { + private boolean hasFile = true; + + @Override + public void beforeScanNext(HTable table) throws Exception { + // move the path away (now the region is corrupted) + if (hasFile) { + fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); + LOG.info("Move file to local"); + evictHFileCache(storeFiles.get(0)); + hasFile = false; + } + } + }); + assertTrue("expected one file lost: rowCount=" + count, count >= (NUM_ROWS - ROW_PER_FILE)); + } + + @Test(timeout=90000) + public void testLosingFileAfterScannerInit() throws Exception { + final String tableName = "testLosingFileAfterScannerInit"; + setupTable(tableName); + assertEquals(rowCount, fullScanAndCount(tableName)); + + final FileSystem fs = getFileSystem(); + final Path tmpStoreFilePath = new Path(UTIL.getDataTestDir(), "corruptedHFile"); + + // try to query with the missing file + int count = fullScanAndCount(tableName, new ScanInjector() { + private boolean hasFile = true; + + @Override + public void beforeScan(HTable table, Scan scan) throws Exception { + // move the path away (now the region is corrupted) + if (hasFile) { + fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); + LOG.info("Move file to local"); + evictHFileCache(storeFiles.get(0)); + hasFile = false; + } + } + }); + assertTrue("expected one file lost: rowCount=" + count, count >= (NUM_ROWS - ROW_PER_FILE)); + } + + // ========================================================================== + // Helpers + // ========================================================================== + private FileSystem getFileSystem() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); + } + + private Path getRootDir() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + } + + private void evictHFileCache(final Path hfile) throws Exception { + for (RegionServerThread rst: UTIL.getMiniHBaseCluster().getRegionServerThreads()) { + HRegionServer rs = rst.getRegionServer(); + rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName()); + } + Thread.sleep(6000); + } + + private int fullScanAndCount(final String tableName) throws Exception { + return fullScanAndCount(tableName, new ScanInjector()); + } + + private int fullScanAndCount(final String tableName, final ScanInjector injector) + throws Exception { + HTable table = new HTable(UTIL.getConfiguration(), tableName); + int count = 0; + try { + Scan scan = new Scan(); + scan.setCaching(1); + scan.setCacheBlocks(false); + injector.beforeScan(table, scan); + ResultScanner scanner = table.getScanner(scan); + try { + while (true) { + injector.beforeScanNext(table); + Result result = scanner.next(); + injector.afterScanNext(table, result); + if (result == null) break; + if ((count++ % 1000) == 0) { + LOG.debug("scan next " + count); + } + } + } finally { + scanner.close(); + injector.afterScan(table); + } + } finally { + table.close(); + } + return count; + } + + private class ScanInjector { + protected void beforeScan(HTable table, Scan scan) throws Exception {} + protected void beforeScanNext(HTable table) throws Exception {} + protected void afterScanNext(HTable table, Result result) throws Exception {} + protected void afterScan(HTable table) throws Exception {} + } +}