Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AA542200C31 for ; Wed, 8 Mar 2017 08:10:45 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id A8D19160B86; Wed, 8 Mar 2017 07:10:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 2D5DD160B75 for ; Wed, 8 Mar 2017 08:10:44 +0100 (CET) Received: (qmail 58018 invoked by uid 500); 8 Mar 2017 07:10:43 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 58009 invoked by uid 99); 8 Mar 2017 07:10:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Mar 2017 07:10:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3D241DFA6F; Wed, 8 Mar 2017 07:10:43 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Message-Id: <39029020f06d42078c5452566d36c9c6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-17712 Remove/Simplify the logic of RegionScannerImpl.handleFileNotFound Date: Wed, 8 Mar 2017 07:10:43 +0000 (UTC) archived-at: Wed, 08 Mar 2017 07:10:45 -0000 Repository: hbase Updated Branches: refs/heads/branch-1 5f630935b -> dcaa9bd71 HBASE-17712 Remove/Simplify the logic of RegionScannerImpl.handleFileNotFound Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dcaa9bd7 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dcaa9bd7 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dcaa9bd7 Branch: refs/heads/branch-1 Commit: dcaa9bd7155ef6f2003bdb780239499fc450fc1e Parents: 5f63093 Author: zhangduo Authored: Mon Mar 6 21:00:50 2017 +0800 Committer: zhangduo Committed: Wed Mar 8 14:51:11 2017 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/HRegion.java | 189 +++++++------------ .../hbase/regionserver/HRegionServer.java | 5 + .../regionserver/RegionServerServices.java | 17 +- .../hbase/regionserver/RegionUnassigner.java | 68 +++++++ .../hadoop/hbase/MockRegionServerServices.java | 4 + .../hadoop/hbase/master/MockRegionServer.java | 4 + .../TestCompactionInDeadRegionServer.java | 143 ++++++++++++++ .../TestCorruptedRegionStoreFile.java | 28 ++- 8 files changed, 329 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/dcaa9bd7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index b616176..14ddbb3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -79,8 +79,6 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import javax.jws.soap.SOAPBinding.Use; - import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -217,6 +215,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi public static final String HREGION_MVCC_PRE_ASSIGN = "hbase.hregion.mvcc.preassign"; public static final boolean DEFAULT_HREGION_MVCC_PRE_ASSIGN = true; + public static final String HREGION_UNASSIGN_FOR_FNFE = "hbase.hregion.unassign.for.fnfe"; + public static final boolean DEFAULT_HREGION_UNASSIGN_FOR_FNFE = true; + /** * Longest time we'll wait on a sequenceid. * Sequenceid comes up out of the WAL subsystem. WAL subsystem can go bad or a test might use @@ -265,9 +266,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi protected volatile long lastReplayedOpenRegionSeqId = -1L; protected volatile long lastReplayedCompactionSeqId = -1L; - // collects Map(s) of Store to sequence Id when handleFileNotFound() is involved - protected List storeSeqIds = new ArrayList<>(); - ////////////////////////////////////////////////////////////////////////////// // Members ////////////////////////////////////////////////////////////////////////////// @@ -670,6 +668,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi private final boolean mvccPreAssign; private final ReentrantLock preAssignMvccLock; + // whether to unassign region if we hit FNFE + private final RegionUnassigner regionUnassigner; /** * HRegion constructor. This constructor should only be used for testing and * extensions. Instances of HRegion should be instantiated with the @@ -827,6 +827,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } else { this.preAssignMvccLock = null; } + boolean unassignForFNFE = + conf.getBoolean(HREGION_UNASSIGN_FOR_FNFE, DEFAULT_HREGION_UNASSIGN_FOR_FNFE); + if (unassignForFNFE) { + this.regionUnassigner = new RegionUnassigner(rsServices, fs.getRegionInfo()); + } else { + this.regionUnassigner = null; + } } void setHTableSpecificConf() { @@ -1496,7 +1503,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi status.setStatus("Running coprocessor pre-close hooks"); this.coprocessorHost.preClose(abort); } - status.setStatus("Disabling compacts and flushes for region"); boolean canFlush = true; synchronized (writestate) { @@ -1637,7 +1643,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (this.metricsRegionWrapper != null) { Closeables.closeQuietly(this.metricsRegionWrapper); } - status.markComplete("Closed"); LOG.info("Closed " + this); return result; @@ -5179,15 +5184,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NN_NAKED_NOTIFY", + justification = "Notify is about post replay. Intentional") @Override public boolean refreshStoreFiles() throws IOException { - return refreshStoreFiles(false); - } - - @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NN_NAKED_NOTIFY", - justification="Notify is about post replay. Intentional") - protected boolean refreshStoreFiles(boolean force) throws IOException { - if (!force && ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { return false; // if primary nothing to do } @@ -5219,7 +5220,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // see whether we can drop the memstore or the snapshot if (storeSeqId > maxSeqIdBefore) { - if (writestate.flushing) { // only drop memstore snapshots if they are smaller than last flush for the store if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) { @@ -5259,16 +5259,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } if (!map.isEmpty()) { - if (!force) { - for (Map.Entry entry : map.entrySet()) { - // Drop the memstore contents if they are now smaller than the latest seen flushed file - totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()); - } - } else { - synchronized (storeSeqIds) { - // don't try to acquire write lock of updatesLock now - storeSeqIds.add(map); - } + for (Map.Entry entry : map.entrySet()) { + // Drop the memstore contents if they are now smaller than the latest seen flushed file + totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()); } } // C. Finally notify anyone waiting on memstore to clear: @@ -5945,12 +5938,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi try { for (Map.Entry> entry : scan.getFamilyMap().entrySet()) { Store store = stores.get(entry.getKey()); - KeyValueScanner scanner; - try { - scanner = store.getScanner(scan, entry.getValue(), this.readPt); - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); - } + KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt); instantiatedScanners.add(scanner); if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand() || this.filter.isFamilyEssential(entry.getKey())) { @@ -5974,8 +5962,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } + private void handleFileNotFound(Throwable fnfe) { + // Try reopenning the region since we have lost some storefiles. + // See HBASE-17712 for more details. + LOG.warn("A store file got lost", fnfe); + if (regionUnassigner != null) { + regionUnassigner.unassign(); + } + } + private IOException handleException(List instantiatedScanners, Throwable t) { + if (t instanceof FileNotFoundException) { + handleFileNotFound(t); + } // remove scaner read point before throw the exception scannerReadPoints.remove(this); if (storeHeap != null) { @@ -6057,15 +6057,20 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // scanner is closed throw new UnknownScannerException("Scanner was closed"); } - boolean moreValues; - if (outResults.isEmpty()) { - // Usually outResults is empty. This is true when next is called - // to handle scan or get operation. - moreValues = nextInternal(outResults, scannerContext); - } else { - List tmpList = new ArrayList(); - moreValues = nextInternal(tmpList, scannerContext); - outResults.addAll(tmpList); + boolean moreValues = false; + try { + if (outResults.isEmpty()) { + // Usually outResults is empty. This is true when next is called + // to handle scan or get operation. + moreValues = nextInternal(outResults, scannerContext); + } else { + List tmpList = new ArrayList(); + moreValues = nextInternal(tmpList, scannerContext); + outResults.addAll(tmpList); + } + } catch (FileNotFoundException e) { + handleFileNotFound(e); + throw e; } // If the size limit was reached it means a partial Result is being returned. Returning a @@ -6120,34 +6125,30 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi boolean tmpKeepProgress = scannerContext.getKeepProgress(); // Scanning between column families and thus the scope is between cells LimitScope limitScope = LimitScope.BETWEEN_CELLS; - try { - do { - // We want to maintain any progress that is made towards the limits while scanning across - // different column families. To do this, we toggle the keep progress flag on during calls - // to the StoreScanner to ensure that any progress made thus far is not wiped away. - scannerContext.setKeepProgress(true); - heap.next(results, scannerContext); - scannerContext.setKeepProgress(tmpKeepProgress); - - nextKv = heap.peek(); - moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); - if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); - - if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { - return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); - } else if (scannerContext.checkSizeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow? NextState.SIZE_LIMIT_REACHED_MID_ROW: NextState.SIZE_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } else if (scannerContext.checkTimeLimit(limitScope)) { - ScannerContext.NextState state = - moreCellsInRow? NextState.TIME_LIMIT_REACHED_MID_ROW: NextState.TIME_LIMIT_REACHED; - return scannerContext.setScannerState(state).hasMoreValues(); - } - } while (moreCellsInRow); - } catch (FileNotFoundException e) { - throw handleFileNotFound(e); - } + do { + // We want to maintain any progress that is made towards the limits while scanning across + // different column families. To do this, we toggle the keep progress flag on during calls + // to the StoreScanner to ensure that any progress made thus far is not wiped away. + scannerContext.setKeepProgress(true); + heap.next(results, scannerContext); + scannerContext.setKeepProgress(tmpKeepProgress); + + nextKv = heap.peek(); + moreCellsInRow = moreCellsInRow(nextKv, currentRow, offset, length); + if (!moreCellsInRow) incrementCountOfRowsScannedMetric(scannerContext); + + if (moreCellsInRow && scannerContext.checkBatchLimit(limitScope)) { + return scannerContext.setScannerState(NextState.BATCH_LIMIT_REACHED).hasMoreValues(); + } else if (scannerContext.checkSizeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.SIZE_LIMIT_REACHED_MID_ROW : NextState.SIZE_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } else if (scannerContext.checkTimeLimit(limitScope)) { + ScannerContext.NextState state = + moreCellsInRow ? NextState.TIME_LIMIT_REACHED_MID_ROW : NextState.TIME_LIMIT_REACHED; + return scannerContext.setScannerState(state).hasMoreValues(); + } + } while (moreCellsInRow); return nextKv != null; } @@ -6510,28 +6511,14 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi result = this.joinedHeap.requestSeek(kv, true, true) || result; } } catch (FileNotFoundException e) { - throw handleFileNotFound(e); + handleFileNotFound(e); + throw e; } finally { closeRegionOperation(); } return result; } - private IOException handleFileNotFound(FileNotFoundException fnfe) throws IOException { - // tries to refresh the store files, otherwise shutdown the RS. - // TODO: add support for abort() of a single region and trigger reassignment. - try { - region.refreshStoreFiles(true); - return new IOException("unable to read store file"); - } catch (IOException e) { - String msg = "a store file got lost: " + fnfe.getMessage(); - LOG.error(msg); - LOG.error("unable to refresh store files", e); - abortRegionServer(msg); - return new NotServingRegionException(getRegionInfo().getRegionNameAsString() + " closing"); - } - } - private void abortRegionServer(String msg) throws IOException { if (rsServices instanceof HRegionServer) { ((HRegionServer)rsServices).abort(msg); @@ -7591,32 +7578,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE); } - // TODO: There's a lot of boiler plate code identical to increment. - // We should refactor append and increment as local get-mutate-put - // transactions, so all stores only go through one code path for puts. - - // dropMemstoreContentsForSeqId() would acquire write lock of updatesLock - // We perform this operation outside of the read lock of updatesLock to avoid dead lock - // See HBASE-16304 - @SuppressWarnings("unchecked") - private void dropMemstoreContents() throws IOException { - long totalFreedSize = 0; - while (!storeSeqIds.isEmpty()) { - Map map = null; - synchronized (storeSeqIds) { - if (storeSeqIds.isEmpty()) break; - map = storeSeqIds.remove(storeSeqIds.size()-1); - } - for (Map.Entry entry : map.entrySet()) { - // Drop the memstore contents if they are now smaller than the latest seen flushed file - totalFreedSize += dropMemstoreContentsForSeqId(entry.getValue(), entry.getKey()); - } - } - if (totalFreedSize > 0) { - LOG.debug("Freed " + totalFreedSize + " bytes from memstore"); - } - } - @Override public Result append(Append mutate, long nonceGroup, long nonce) throws IOException { Operation op = Operation.APPEND; @@ -7776,10 +7737,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } finally { this.updatesLock.readLock().unlock(); - // For increment/append, a region scanner for doing a get operation could throw - // FileNotFoundException. So we call dropMemstoreContents() in finally block - // after releasing read lock - dropMemstoreContents(); } } finally { @@ -8010,10 +7967,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } finally { this.updatesLock.readLock().unlock(); - // For increment/append, a region scanner for doing a get operation could throw - // FileNotFoundException. So we call dropMemstoreContents() in finally block - // after releasing read lock - dropMemstoreContents(); } } finally { rowLock.release(); http://git-wip-us.apache.org/repos/asf/hbase/blob/dcaa9bd7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 39b6380..11621e2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -3486,4 +3486,9 @@ public class HRegionServer extends HasThread implements public MetricsRegionServer getMetrics() { return metricsRegionServer; } + + @Override + public void unassign(byte[] regionName) throws IOException { + clusterConnection.getAdmin().unassign(regionName, false); + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/dcaa9bd7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index 99a7b12..0e62a94 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -18,8 +18,9 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.protobuf.Service; + import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentMap; @@ -34,13 +35,11 @@ import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.ipc.RpcServerInterface; import org.apache.hadoop.hbase.master.TableLockManager; import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; +import org.apache.hadoop.hbase.wal.WAL; import org.apache.zookeeper.KeeperException; -import com.google.protobuf.Service; - /** * Services provided by {@link HRegionServer} */ @@ -249,4 +248,14 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi * @return the metrics tracker for the region server */ MetricsRegionServer getMetrics(); + + /** + * Unassign the given region from the current regionserver and assign it randomly. Could still be + * assigned to us. This is used to solve some tough problems for which you need to reset the state + * of a region. For example, if you hit FileNotFound exception and want to refresh the store file + * list. + *

+ * See HBASE-17712 for more details. + */ + void unassign(byte[] regionName) throws IOException; } http://git-wip-us.apache.org/repos/asf/hbase/blob/dcaa9bd7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java new file mode 100644 index 0000000..b347b4b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionUnassigner.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Used to unssign a region when we hit FNFE. + */ +@InterfaceAudience.Private +class RegionUnassigner { + + private static final Log LOG = LogFactory.getLog(RegionUnassigner.class); + + private final RegionServerServices rsServices; + + private final HRegionInfo regionInfo; + + private boolean unassigning = false; + + RegionUnassigner(RegionServerServices rsServices, HRegionInfo regionInfo) { + this.rsServices = rsServices; + this.regionInfo = regionInfo; + } + + synchronized void unassign() { + if (unassigning) { + return; + } + unassigning = true; + new Thread("Unassign-" + regionInfo) { + + @Override + public void run() { + LOG.info("Unassign " + regionInfo.getRegionNameAsString()); + try { + rsServices.unassign(regionInfo.getRegionName()); + } catch (IOException e) { + LOG.warn("Unassigned " + regionInfo.getRegionNameAsString() + " failed", e); + } finally { + synchronized (RegionUnassigner.this) { + unassigning = false; + } + } + } + }.start(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/dcaa9bd7/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index fa22eea..4e6eab3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -326,4 +326,8 @@ public class MockRegionServerServices implements RegionServerServices { public MetricsRegionServer getMetrics() { return null; } + + @Override + public void unassign(byte[] regionName) throws IOException { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/dcaa9bd7/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 90752a9..5cb7e8f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -670,4 +670,8 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices { public MetricsRegionServer getMetrics() { return null; } + + @Override + public void unassign(byte[] regionName) throws IOException { + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/dcaa9bd7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java new file mode 100644 index 0000000..cebae4f --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionInDeadRegionServer.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CoordinatedStateManager; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter.ExplainingPredicate; +import org.apache.hadoop.hbase.YouAreDeadException; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * This testcase is used to ensure that the compaction marker will fail a compaction if the RS is + * already dead. It can not eliminate FNFE when scanning but it does reduce the possibility a lot. + */ +@Category({ RegionServerTests.class, LargeTests.class }) +public class TestCompactionInDeadRegionServer { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + public static final class IgnoreYouAreDeadRS extends HRegionServer { + + public IgnoreYouAreDeadRS(Configuration conf) throws IOException, InterruptedException { + super(conf); + } + + public IgnoreYouAreDeadRS(Configuration conf, CoordinatedStateManager csm) + throws IOException, InterruptedException { + super(conf, csm); + } + + @Override + protected void tryRegionServerReport(long reportStartTime, long reportEndTime) + throws IOException { + try { + super.tryRegionServerReport(reportStartTime, reportEndTime); + } catch (YouAreDeadException e) { + // ignore, do not abort + } + } + } + + @Before + public void setUp() throws Exception { + UTIL.getConfiguration().setInt(HConstants.ZK_SESSION_TIMEOUT, 2000); + UTIL.getConfiguration().setClass(HConstants.REGION_SERVER_IMPL, IgnoreYouAreDeadRS.class, + HRegionServer.class); + UTIL.startMiniCluster(2); + Table table = UTIL.createTable(TABLE_NAME, CF); + for (int i = 0; i < 10; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + UTIL.getHBaseAdmin().flush(TABLE_NAME); + for (int i = 10; i < 20; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + UTIL.getHBaseAdmin().flush(TABLE_NAME); + } + + @After + public void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Test + public void test() throws Exception { + final HRegionServer rsToSuspend = UTIL.getRSForFirstRegionInTable(TABLE_NAME); + HRegion region = (HRegion) rsToSuspend.getOnlineRegions(TABLE_NAME).get(0); + ZooKeeperWatcher watcher = UTIL.getZooKeeperWatcher(); + watcher.getRecoverableZooKeeper() + .delete(ZKUtil.joinZNode(watcher.rsZNode, rsToSuspend.getServerName().toString()), -1); + UTIL.waitFor(60000, 1000, new ExplainingPredicate() { + + @Override + public boolean evaluate() throws Exception { + for (RegionServerThread thread : UTIL.getHBaseCluster().getRegionServerThreads()) { + HRegionServer rs = thread.getRegionServer(); + if (rs != rsToSuspend) { + return !rs.getOnlineRegions(TABLE_NAME).isEmpty(); + } + } + return false; + } + + @Override + public String explainFailure() throws Exception { + return "The region for " + TABLE_NAME + " is still on " + rsToSuspend.getServerName(); + } + }); + try { + region.compact(true); + fail("Should fail as our wal file has already been closed, " + + "and walDir has also been renamed"); + } catch (Exception e) { + // expected + } + Table table = UTIL.getConnection().getTable(TABLE_NAME); + // should not hit FNFE + for (int i = 0; i < 20; i++) { + assertEquals(i, Bytes.toInt(table.get(new Get(Bytes.toBytes(i))).getValue(CF, CQ))); + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/dcaa9bd7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java index 969ef34..daa03b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCorruptedRegionStoreFile.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -135,6 +137,23 @@ public class TestCorruptedRegionStoreFile { } } + private void removeStoreFile(FileSystem fs, Path tmpStoreFilePath) throws Exception { + try (FSDataInputStream input = fs.open(storeFiles.get(0))) { + fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); + LOG.info("Move file to local"); + evictHFileCache(storeFiles.get(0)); + // make sure that all the replicas have been deleted on DNs. + for (;;) { + try { + input.read(0, new byte[1], 0, 1); + } catch (FileNotFoundException e) { + break; + } + Thread.sleep(1000); + } + } + } + @Test(timeout=180000) public void testLosingFileDuringScan() throws Exception { assertEquals(rowCount, fullScanAndCount(TEST_TABLE.getTableName())); @@ -150,9 +169,7 @@ public class TestCorruptedRegionStoreFile { public void beforeScanNext(Table table) throws Exception { // move the path away (now the region is corrupted) if (hasFile) { - fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); - LOG.info("Move file to local"); - evictHFileCache(storeFiles.get(0)); + removeStoreFile(fs, tmpStoreFilePath); hasFile = false; } } @@ -176,9 +193,7 @@ public class TestCorruptedRegionStoreFile { public void beforeScan(Table table, Scan scan) throws Exception { // move the path away (now the region is corrupted) if (hasFile) { - fs.copyToLocalFile(true, storeFiles.get(0), tmpStoreFilePath); - LOG.info("Move file to local"); - evictHFileCache(storeFiles.get(0)); + removeStoreFile(fs, tmpStoreFilePath); hasFile = false; } } @@ -203,7 +218,6 @@ public class TestCorruptedRegionStoreFile { HRegionServer rs = rst.getRegionServer(); rs.getCacheConfig().getBlockCache().evictBlocksByHfileName(hfile.getName()); } - Thread.sleep(6000); } private int fullScanAndCount(final TableName tableName) throws Exception {