Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CDBD7200D2F for ; Wed, 18 Oct 2017 05:15:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CC1901609EC; Wed, 18 Oct 2017 03:15:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 784431609EB for ; Wed, 18 Oct 2017 05:15:01 +0200 (CEST) Received: (qmail 693 invoked by uid 500); 18 Oct 2017 03:15:00 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 675 invoked by uid 99); 18 Oct 2017 03:15:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Oct 2017 03:15:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 229FBDFB3D; Wed, 18 Oct 2017 03:15:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zhangduo@apache.org To: commits@hbase.apache.org Date: Wed, 18 Oct 2017 03:15:01 -0000 Message-Id: <6b51e219ee00406dbcaf4115e8fab7e0@git.apache.org> In-Reply-To: <4c1003423f9b4249a82e73759ec2a332@git.apache.org> References: <4c1003423f9b4249a82e73759ec2a332@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] hbase git commit: HBASE-19001 Remove the hooks in RegionObserver which are designed to construct a StoreScanner which is marked as IA.Private archived-at: Wed, 18 Oct 2017 03:15:04 -0000 HBASE-19001 Remove the hooks in RegionObserver which are designed to construct a StoreScanner which is marked as IA.Private Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e804dd0b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e804dd0b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e804dd0b Branch: refs/heads/master Commit: e804dd0b600f898f7519dee7134b68ad04c20a9a Parents: 5368fd5 Author: zhangduo Authored: Tue Oct 17 21:27:05 2017 +0800 Committer: zhangduo Committed: Wed Oct 18 11:06:39 2017 +0800 ---------------------------------------------------------------------- .../hbase/coprocessor/RegionObserver.java | 77 ----- .../hadoop/hbase/regionserver/HMobStore.java | 24 +- .../hadoop/hbase/regionserver/HRegion.java | 4 +- .../hadoop/hbase/regionserver/HStore.java | 18 +- .../hadoop/hbase/regionserver/Region.java | 3 - .../regionserver/RegionCoprocessorHost.java | 64 +--- .../regionserver/ReversedStoreScanner.java | 6 +- .../hadoop/hbase/regionserver/StoreFlusher.java | 12 +- .../regionserver/compactions/Compactor.java | 44 +-- ...estAvoidCellReferencesIntoShippedBlocks.java | 197 ++++++------- .../hadoop/hbase/client/TestFromClientSide.java | 156 ---------- .../client/TestFromClientSideScanExcpetion.java | 238 +++++++++++++++ ...mClientSideScanExcpetionWithCoprocessor.java | 43 +++ .../hbase/coprocessor/SimpleRegionObserver.java | 36 --- .../TestRegionObserverScannerOpenHook.java | 31 +- .../regionserver/DelegatingInternalScanner.java | 45 +++ .../regionserver/NoOpScanPolicyObserver.java | 60 +--- .../hbase/util/TestCoprocessorScanPolicy.java | 290 +++++++++++-------- 18 files changed, 647 insertions(+), 701 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index a1e4f0e..d03a9be 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -124,27 +124,6 @@ public interface RegionObserver { default void postLogReplay(ObserverContext c) {} /** - * Called before a memstore is flushed to disk and prior to creating the scanner to read from - * the memstore. To override or modify how a memstore is flushed, - * implementing classes can return a new scanner to provide the KeyValues to be - * stored into the new {@code StoreFile} or null to perform the default processing. - * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no - * effect in this hook. - * @param c the environment provided by the region server - * @param store the store being flushed - * @param scanners the scanners for the memstore that is flushed - * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @param readPoint the readpoint to create scanner - * @return the scanner to use during the flush. {@code null} if the default implementation - * is to be used. - */ - default InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - return s; - } - - /** * Called before the memstore is flushed to disk. * @param c the environment provided by the region server */ @@ -236,33 +215,6 @@ public interface RegionObserver { } /** - * Called prior to writing the {@link StoreFile}s selected for compaction into a new - * {@code StoreFile} and prior to creating the scanner used to read the input files. To override - * or modify the compaction process, implementing classes can return a new scanner to provide the - * KeyValues to be stored into the new {@code StoreFile} or null to perform the default - * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no - * effect in this hook. - * @param c the environment provided by the region server - * @param store the store being compacted - * @param scanners the list of store file scanners to be read from - * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction - * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store - * files - * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @param tracker used to track the life cycle of a compaction - * @param request the requested compaction - * @param readPoint the readpoint to create scanner - * @return the scanner to use during compaction. {@code null} if the default implementation is to - * be used. - */ - default InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - return s; - } - - /** * Called after compaction has completed and the new store file has been moved in to place. * @param c the environment provided by the region server * @param store the store being compacted @@ -803,35 +755,6 @@ public interface RegionObserver { } /** - * Called before a store opens a new scanner. - * This hook is called when a "user" scanner is opened. - *

- * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} - * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, - * InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)} to override scanners - * created for flushes or compactions, resp. - *

- * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors. - * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no - * effect in this hook. - *

- * Note: Do not retain references to any Cells returned by scanner, beyond the life of this - * invocation. If need a Cell reference for later use, copy the cell and use that. - * @param c the environment provided by the region server - * @param store the store being scanned - * @param scan the Scan specification - * @param targetCols columns to be used in the scanner - * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain - * @param readPt the read point - * @return a KeyValueScanner instance to use or {@code null} to use the default implementation - */ - default KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPt) - throws IOException { - return s; - } - - /** * Called after the client opens a new scanner. *

* Call CoprocessorEnvironment#complete to skip any subsequent chained http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java index 95bbf74..206c3cd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java @@ -145,21 +145,19 @@ public class HMobStore extends HStore { */ @Override protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, - long readPt, KeyValueScanner scanner) throws IOException { - if (scanner == null) { - if (MobUtils.isRefOnlyScan(scan)) { - Filter refOnlyFilter = new MobReferenceOnlyFilter(); - Filter filter = scan.getFilter(); - if (filter != null) { - scan.setFilter(new FilterList(filter, refOnlyFilter)); - } else { - scan.setFilter(refOnlyFilter); - } + long readPt) throws IOException { + if (MobUtils.isRefOnlyScan(scan)) { + Filter refOnlyFilter = new MobReferenceOnlyFilter(); + Filter filter = scan.getFilter(); + if (filter != null) { + scan.setFilter(new FilterList(filter, refOnlyFilter)); + } else { + scan.setFilter(refOnlyFilter); } - scanner = scan.isReversed() ? new ReversedMobStoreScanner(this, getScanInfo(), scan, - targetCols, readPt) : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); } - return scanner; + return scan.isReversed() + ? new ReversedMobStoreScanner(this, getScanInfo(), scan, targetCols, readPt) + : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 1cbb689..da3a1e9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -1446,7 +1446,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi return maxFlushedSeqId; } - @Override + /** + * @return readpoint considering given IsolationLevel. Pass {@code null} for default + */ public long getReadPoint(IsolationLevel isolationLevel) { if (isolationLevel != null && isolationLevel == IsolationLevel.READ_UNCOMMITTED) { // This scan can read even uncommitted transactions http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 186608f..83b5561 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1920,25 +1920,17 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat final NavigableSet targetCols, long readPt) throws IOException { lock.readLock().lock(); try { - KeyValueScanner scanner = null; - if (this.getCoprocessorHost() != null) { - scanner = this.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols, readPt); - } - scanner = createScanner(scan, targetCols, readPt, scanner); - return scanner; + return createScanner(scan, targetCols, readPt); } finally { lock.readLock().unlock(); } } protected KeyValueScanner createScanner(Scan scan, final NavigableSet targetCols, - long readPt, KeyValueScanner scanner) throws IOException { - if (scanner == null) { - scanner = scan.isReversed() ? new ReversedStoreScanner(this, - getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, - getScanInfo(), scan, targetCols, readPt); - } - return scanner; + long readPt) throws IOException { + return scan.isReversed() ? new ReversedStoreScanner(this, + getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this, + getScanInfo(), scan, targetCols, readPt); } /** http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 630ae80..79012ea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -131,9 +131,6 @@ public interface Region extends ConfigurationObserver { */ public Map getMaxStoreSeqId(); - /** @return readpoint considering given IsolationLevel; pass null for default*/ - long getReadPoint(IsolationLevel isolationLevel); - /** * @return The earliest time a store in the region was flushed. All * other stores in the region would have been flushed either at, or http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index b78d95b..fbd93b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -19,18 +19,18 @@ package org.apache.hadoop.hbase.regionserver; +import com.google.protobuf.Message; +import com.google.protobuf.Service; + import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.NavigableSet; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.regex.Matcher; -import com.google.protobuf.Message; -import com.google.protobuf.Service; import org.apache.commons.collections4.map.AbstractReferenceMap; import org.apache.commons.collections4.map.ReferenceMap; import org.apache.commons.logging.Log; @@ -42,7 +42,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Append; import org.apache.hadoop.hbase.client.Delete; @@ -77,15 +76,15 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; -import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CoprocessorClassLoader; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; + +import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; /** * Implements the coprocessor environment and runtime support for coprocessors @@ -542,27 +541,6 @@ public class RegionCoprocessorHost } /** - * See - * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, - * InternalScanner, CompactionLifeCycleTracker, CompactionRequest, long)} - */ - public InternalScanner preCompactScannerOpen(final HStore store, - final List scanners, final ScanType scanType, final long earliestPutTs, - final CompactionLifeCycleTracker tracker, final CompactionRequest request, final User user, - final long readPoint) - throws IOException { - return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult( - regionObserverGetter, user) { - @Override - public InternalScanner call(RegionObserver observer) throws IOException { - return observer.preCompactScannerOpen(this, store, scanners, scanType, - earliestPutTs, getResult(), tracker, request, readPoint); - } - }); - } - - /** * Called prior to selecting the {@link HStoreFile}s for compaction from the list of currently * available candidates. * @param store The store where compaction is being requested @@ -674,21 +652,6 @@ public class RegionCoprocessorHost } /** - * See - * {@link RegionObserver#preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)} - */ - public InternalScanner preFlushScannerOpen(final HStore store, - final List scanners, final long readPoint) throws IOException { - return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { - @Override - public InternalScanner call(RegionObserver observer) throws IOException { - return observer.preFlushScannerOpen(this, store, scanners, getResult(), readPoint); - } - }); - } - - /** * Invoked after a memstore flush * @throws IOException */ @@ -1159,21 +1122,6 @@ public class RegionCoprocessorHost } /** - * See - * {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner, long)} - */ - public KeyValueScanner preStoreScannerOpen(final HStore store, final Scan scan, - final NavigableSet targetCols, final long readPt) throws IOException { - return execOperationWithResult(null, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { - @Override - public KeyValueScanner call(RegionObserver observer) throws IOException { - return observer.preStoreScannerOpen(this, store, scan, targetCols, getResult(), readPt); - } - }); - } - - /** * @param scan the Scan specification * @param s the scanner * @return the scanner instance to use http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index 0089d3f..04e6865 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -34,7 +34,7 @@ import org.apache.yetus.audience.InterfaceAudience; * reversed scanning. */ @InterfaceAudience.Private -class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { +public class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { /** * Opens a scanner across memstore, snapshot, and all StoreFiles. Assumes we @@ -46,14 +46,14 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { * @param columns which columns we are scanning * @throws IOException */ - ReversedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, + public ReversedStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns, long readPt) throws IOException { super(store, scanInfo, scan, columns, readPt); } /** Constructor for testing. */ - ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet columns, + public ReversedStoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet columns, List scanners) throws IOException { super(scan, scanInfo, columns, scanners); } http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 124b7b5..8fde7d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -79,15 +79,9 @@ abstract class StoreFlusher { */ protected InternalScanner createScanner(List snapshotScanners, long smallestReadPoint) throws IOException { - InternalScanner scanner = null; - if (store.getCoprocessorHost() != null) { - scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanners, - smallestReadPoint); - } - if (scanner == null) { - scanner = new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners, - ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); - } + InternalScanner scanner = + new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners, + ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); assert scanner != null; if (store.getCoprocessorHost() != null) { try { http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 7ca3ab4..f9efd98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -86,8 +86,8 @@ public abstract class Compactor { protected static final String MINOR_COMPACTION_DROP_CACHE = "hbase.regionserver.minorcompaction.pagecache.drop"; - private boolean dropCacheMajor; - private boolean dropCacheMinor; + private final boolean dropCacheMajor; + private final boolean dropCacheMinor; //TODO: depending on Store is not good but, realistically, all compactors currently do. Compactor(Configuration conf, HStore store) { @@ -138,7 +138,7 @@ public abstract class Compactor { * @param allFiles Whether all files are included for compaction * @return The result. */ - protected FileDetails getFileDetails( + private FileDetails getFileDetails( Collection filesToCompact, boolean allFiles) throws IOException { FileDetails fd = new FileDetails(); long oldestHFileTimeStampToKeepMVCC = System.currentTimeMillis() - @@ -217,13 +217,13 @@ public abstract class Compactor { * @param filesToCompact Files. * @return Scanners. */ - protected List createFileScanners(Collection filesToCompact, + private List createFileScanners(Collection filesToCompact, long smallestReadPoint, boolean useDropBehind) throws IOException { return StoreFileScanner.getScannersForCompaction(filesToCompact, useDropBehind, smallestReadPoint); } - protected long getSmallestReadPoint() { + private long getSmallestReadPoint() { return store.getSmallestReadPoint(); } @@ -257,7 +257,7 @@ public abstract class Compactor { * @return Writer for a new StoreFile in the tmp dir. * @throws IOException if creation failed */ - protected StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) + protected final StoreFileWriter createTmpWriter(FileDetails fd, boolean shouldDropBehind) throws IOException { // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. @@ -267,7 +267,7 @@ public abstract class Compactor { /* includesTags = */fd.maxTagsLength > 0, shouldDropBehind); } - protected List compact(final CompactionRequestImpl request, + protected final List compact(final CompactionRequestImpl request, InternalScannerFactory scannerFactory, CellSinkFactory sinkFactory, ThroughputController throughputController, User user) throws IOException { FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles()); @@ -291,12 +291,8 @@ public abstract class Compactor { try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); - scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user, - smallestReadPoint); - if (scanner == null) { - scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint); - } - scanner = postCreateCoprocScanner(request, scanType, scanner, user); + scanner = postCreateCoprocScanner(request, scanType, + scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint), user); if (scanner == null) { // NULL scanner returned from coprocessor hooks means skip normal processing. return new ArrayList<>(); @@ -331,33 +327,13 @@ public abstract class Compactor { protected abstract void abortWriter(T writer) throws IOException; /** - * Calls coprocessor, if any, to create compaction scanner - before normal scanner creation. - * @param request Compaction request. - * @param scanType Scan type. - * @param earliestPutTs Earliest put ts. - * @param scanners File scanners for compaction files. - * @param user the User - * @param readPoint the read point to help create scanner by Coprocessor if required. - * @return Scanner override by coprocessor; null if not overriding. - */ - protected InternalScanner preCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType, - long earliestPutTs, List scanners, User user, long readPoint) - throws IOException { - if (store.getCoprocessorHost() == null) { - return null; - } - return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, - earliestPutTs, request.getTracker(), request, user, readPoint); - } - - /** * Calls coprocessor, if any, to create scanners - after normal scanner creation. * @param request Compaction request. * @param scanType Scan type. * @param scanner The default scanner created for compaction. * @return Scanner scanner to use (usually the default); null if compaction should not proceed. */ - protected InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType, + private InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType, InternalScanner scanner, User user) throws IOException { if (store.getCoprocessorHost() == null) { return scanner; http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java index ac0a4e6..baf0145 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAvoidCellReferencesIntoShippedBlocks.java @@ -24,12 +24,9 @@ import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Optional; -import java.util.OptionalInt; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparatorImpl; @@ -46,15 +43,13 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache; import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CachedBlock; +import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; -import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -67,10 +62,10 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; +import org.apache.hadoop.hbase.shaded.com.google.common.collect.Iterables; + @Category({ LargeTests.class, ClientTests.class }) -@SuppressWarnings("deprecation") public class TestAvoidCellReferencesIntoShippedBlocks { - private static final Log LOG = LogFactory.getLog(TestAvoidCellReferencesIntoShippedBlocks.class); protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); static byte[][] ROWS = new byte[2][]; private static byte[] ROW = Bytes.toBytes("testRow"); @@ -134,7 +129,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks { try { // get the block cache and region RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); - String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName).getRegion(regionName); HStore store = region.getStores().iterator().next(); @@ -190,10 +185,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks { // Load cache Scan s = new Scan(); s.setMaxResultSize(1000); - ResultScanner scanner = table.getScanner(s); - int count = 0; - for (Result result : scanner) { - count++; + int count; + try (ResultScanner scanner = table.getScanner(s)) { + count = Iterables.size(scanner); } assertEquals("Count all the rows ", count, 6); // all the cache is loaded @@ -203,10 +197,8 @@ public class TestAvoidCellReferencesIntoShippedBlocks { region.compact(true); s = new Scan(); s.setMaxResultSize(1000); - scanner = table.getScanner(s); - count = 0; - for (Result result : scanner) { - count++; + try (ResultScanner scanner = table.getScanner(s)) { + count = Iterables.size(scanner); } assertEquals("Count all the rows ", count, 6); } finally { @@ -224,10 +216,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks { } public void run() { - Scan s = new Scan(); - s.setCaching(1); - s.setStartRow(ROW4); - s.setStopRow(ROW5); + Scan s = new Scan().withStartRow(ROW4).withStopRow(ROW5).setCaching(1); try { while(!doScan.get()) { try { @@ -246,9 +235,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks { // evict what ever is available cache.evictBlock(cacheKey); } - ResultScanner scanner = table.getScanner(s); - for (Result res : scanner) { - + try (ResultScanner scanner = table.getScanner(s)) { + while (scanner.next() != null) { + } } compactReadLatch.countDown(); } catch (IOException e) { @@ -264,35 +253,24 @@ public class TestAvoidCellReferencesIntoShippedBlocks { } @Override - public InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, - long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, - CompactionRequest request, long readPoint) - throws IOException { - return createCompactorScanner((HStore) store, scanners, scanType, earliestPutTs); - } - - private InternalScanner createCompactorScanner(HStore store, - List scanners, ScanType scanType, long earliestPutTs) - throws IOException { - return new CompactorStoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, - scanType, store.getSmallestReadPoint(), earliestPutTs); + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return new CompactorInternalScanner(scanner); } } - private static class CompactorStoreScanner extends StoreScanner { + private static final class CompactorInternalScanner extends DelegatingInternalScanner { - public CompactorStoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions, - List scanners, ScanType scanType, long smallestReadPoint, - long earliestPutTs) throws IOException { - super(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs); + public CompactorInternalScanner(InternalScanner scanner) { + super(scanner); } @Override - public boolean next(List outResult, ScannerContext scannerContext) throws IOException { - boolean next = super.next(outResult, scannerContext); - for (Cell cell : outResult) { - if(CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { + public boolean next(List result, ScannerContext scannerContext) throws IOException { + boolean next = scanner.next(result, scannerContext); + for (Cell cell : result) { + if (CellComparatorImpl.COMPARATOR.compareRows(cell, ROW2, 0, ROW2.length) == 0) { try { // hold the compaction // set doscan to true @@ -314,7 +292,7 @@ public class TestAvoidCellReferencesIntoShippedBlocks { try { // get the block cache and region RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName); - String regionName = locator.getAllRegionLocations().get(0).getRegionInfo().getEncodedName(); + String regionName = locator.getAllRegionLocations().get(0).getRegion().getEncodedName(); HRegion region = (HRegion) TEST_UTIL.getRSForFirstRegionInTable(tableName) .getRegion(regionName); HStore store = region.getStores().iterator().next(); @@ -364,10 +342,9 @@ public class TestAvoidCellReferencesIntoShippedBlocks { // Load cache Scan s = new Scan(); s.setMaxResultSize(1000); - ResultScanner scanner = table.getScanner(s); - int count = 0; - for (Result result : scanner) { - count++; + int count; + try (ResultScanner scanner = table.getScanner(s)) { + count = Iterables.size(scanner); } assertEquals("Count all the rows ", count, 6); @@ -375,77 +352,75 @@ public class TestAvoidCellReferencesIntoShippedBlocks { s = new Scan(); // Start a scan from row3 s.setCaching(1); - s.setStartRow(ROW1); + s.withStartRow(ROW1); // set partial as true so that the scan can send partial columns also s.setAllowPartialResults(true); s.setMaxResultSize(1000); - scanner = table.getScanner(s); - Thread evictorThread = new Thread() { - @Override - public void run() { - List cacheList = new ArrayList<>(); - Iterator iterator = cache.iterator(); - // evict all the blocks - while (iterator.hasNext()) { - CachedBlock next = iterator.next(); - BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); - cacheList.add(cacheKey); - cache.evictBlock(cacheKey); - } - try { - Thread.sleep(1); - } catch (InterruptedException e1) { - } - iterator = cache.iterator(); - int refBlockCount = 0; - while (iterator.hasNext()) { - iterator.next(); - refBlockCount++; - } - assertEquals("One block should be there ", refBlockCount, 1); - // Rescan to prepopulate the data - // cache this row. - Scan s1 = new Scan(); - // This scan will start from ROW1 and it will populate the cache with a - // row that is lower than ROW3. - s1.setStartRow(ROW3); - s1.setStopRow(ROW5); - s1.setCaching(1); - ResultScanner scanner; - try { - scanner = table.getScanner(s1); - int count = 0; - for (Result result : scanner) { - count++; - } - assertEquals("Count the rows", count, 2); - iterator = cache.iterator(); - List newCacheList = new ArrayList<>(); + try (ResultScanner scanner = table.getScanner(s)) { + Thread evictorThread = new Thread() { + @Override + public void run() { + List cacheList = new ArrayList<>(); + Iterator iterator = cache.iterator(); + // evict all the blocks while (iterator.hasNext()) { CachedBlock next = iterator.next(); BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); - newCacheList.add(cacheKey); + cacheList.add(cacheKey); + cache.evictBlock(cacheKey); } - int newBlockRefCount = 0; - for (BlockCacheKey key : cacheList) { - if (newCacheList.contains(key)) { - newBlockRefCount++; - } + try { + Thread.sleep(1); + } catch (InterruptedException e1) { + } + iterator = cache.iterator(); + int refBlockCount = 0; + while (iterator.hasNext()) { + iterator.next(); + refBlockCount++; } + assertEquals("One block should be there ", refBlockCount, 1); + // Rescan to prepopulate the data + // cache this row. + Scan s1 = new Scan(); + // This scan will start from ROW1 and it will populate the cache with a + // row that is lower than ROW3. + s1.withStartRow(ROW3); + s1.withStopRow(ROW5); + s1.setCaching(1); + ResultScanner scanner; + try { + scanner = table.getScanner(s1); + int count = Iterables.size(scanner); + assertEquals("Count the rows", count, 2); + iterator = cache.iterator(); + List newCacheList = new ArrayList<>(); + while (iterator.hasNext()) { + CachedBlock next = iterator.next(); + BlockCacheKey cacheKey = new BlockCacheKey(next.getFilename(), next.getOffset()); + newCacheList.add(cacheKey); + } + int newBlockRefCount = 0; + for (BlockCacheKey key : cacheList) { + if (newCacheList.contains(key)) { + newBlockRefCount++; + } + } - assertEquals("old blocks should still be found ", newBlockRefCount, 6); - latch.countDown(); + assertEquals("old blocks should still be found ", newBlockRefCount, 6); + latch.countDown(); - } catch (IOException e) { + } catch (IOException e) { + } + } + }; + count = 0; + while (scanner.next() != null) { + count++; + if (count == 2) { + evictorThread.start(); + latch.await(); } - } - }; - count = 0; - for (Result result : scanner) { - count++; - if (count == 2) { - evictorThread.start(); - latch.await(); } } assertEquals("Count should give all rows ", count, 10); http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index d887e7b..85d84de 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -39,14 +39,10 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.NavigableMap; -import java.util.NavigableSet; -import java.util.Optional; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang3.ArrayUtils; @@ -57,7 +53,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.ClusterStatus.Option; import org.apache.hadoop.hbase.CompareOperator; -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; @@ -74,11 +69,6 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; -import org.apache.hadoop.hbase.coprocessor.ObserverContext; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; -import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; -import org.apache.hadoop.hbase.coprocessor.RegionObserver; -import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; @@ -104,15 +94,10 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; -import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; -import org.apache.hadoop.hbase.regionserver.ScanInfo; -import org.apache.hadoop.hbase.regionserver.Store; -import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -540,147 +525,6 @@ public class TestFromClientSide { assertEquals(rowCount - endKeyCount, countGreater); } - /** - * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will - * fail with an IOException() on the first call. - */ - public static class ExceptionInReseekRegionObserver implements RegionCoprocessor, RegionObserver { - static AtomicLong reqCount = new AtomicLong(0); - static AtomicBoolean isDoNotRetry = new AtomicBoolean(false); // whether to throw DNRIOE - static AtomicBoolean throwOnce = new AtomicBoolean(true); // whether to only throw once - - static void reset() { - reqCount.set(0); - isDoNotRetry.set(false); - throwOnce.set(true); - } - - @Override - public Optional getRegionObserver() { - return Optional.of(this); - } - - class MyStoreScanner extends StoreScanner { - public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns, - long readPt) throws IOException { - super(store, scanInfo, scan, columns, readPt); - } - - @Override - protected List selectScannersFrom(HStore store, - List allScanners) { - List scanners = super.selectScannersFrom(store, allScanners); - List newScanners = new ArrayList<>(scanners.size()); - for (KeyValueScanner scanner : scanners) { - newScanners.add(new DelegatingKeyValueScanner(scanner) { - @Override - public boolean reseek(Cell key) throws IOException { - reqCount.incrementAndGet(); - if (!throwOnce.get()|| reqCount.get() == 1) { - if (isDoNotRetry.get()) { - throw new DoNotRetryIOException("Injected exception"); - } else { - throw new IOException("Injected exception"); - } - } - return super.reseek(key); - } - }); - } - return newScanners; - } - } - - @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, - final long readPt) throws IOException { - HStore hs = (HStore) store; - return new MyStoreScanner(hs, hs.getScanInfo(), scan, targetCols, readPt); - } - } - - /** - * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek - * leaving the server side RegionScanner to be in dirty state. The client has to ensure that the - * ClientScanner does not get an exception and also sees all the data. - * @throws IOException - * @throws InterruptedException - */ - @Test - public void testClientScannerIsResetWhenScanThrowsIOException() - throws IOException, InterruptedException { - TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); - final TableName tableName = TableName.valueOf(name.getMethodName()); - - HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY); - htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); - TEST_UTIL.getAdmin().createTable(htd); - ExceptionInReseekRegionObserver.reset(); - ExceptionInReseekRegionObserver.throwOnce.set(true); // throw exceptions only once - try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { - int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); - TEST_UTIL.getAdmin().flush(tableName); - int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); - assertEquals(rowCount, actualRowCount); - } - assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0); - } - - /** - * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation - * is that the exception will bubble up to the client scanner instead of being retried. - */ - @Test (timeout = 180000) - public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() - throws IOException, InterruptedException { - TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); - final TableName tableName = TableName.valueOf(name.getMethodName()); - - HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY); - htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); - TEST_UTIL.getAdmin().createTable(htd); - ExceptionInReseekRegionObserver.reset(); - ExceptionInReseekRegionObserver.isDoNotRetry.set(true); - try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { - TEST_UTIL.loadTable(t, FAMILY, false); - TEST_UTIL.getAdmin().flush(tableName); - TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); - fail("Should have thrown an exception"); - } catch (DoNotRetryIOException expected) { - // expected - } - assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0); - } - - /** - * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation - * is that the we will keep on retrying, but fail after the retries are exhausted instead of - * retrying indefinitely. - */ - @Test (timeout = 180000) - public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() - throws IOException, InterruptedException { - TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); - final TableName tableName = TableName.valueOf(name.getMethodName()); - TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); - HTableDescriptor htd = TEST_UTIL.createTableDescriptor(tableName, FAMILY); - htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); - TEST_UTIL.getAdmin().createTable(htd); - ExceptionInReseekRegionObserver.reset(); - ExceptionInReseekRegionObserver.throwOnce.set(false); // throw exceptions in every retry - try (Table t = TEST_UTIL.getConnection().getTable(tableName)) { - TEST_UTIL.loadTable(t, FAMILY, false); - TEST_UTIL.getAdmin().flush(tableName); - TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); - fail("Should have thrown an exception"); - } catch (DoNotRetryIOException expected) { - assertTrue(expected instanceof ScannerResetException); - // expected - } - assertTrue(ExceptionInReseekRegionObserver.reqCount.get() >= 3); - } - /* * @param key * @return Scan with RowFilter that does LESS than passed key. http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java new file mode 100644 index 0000000..f18ccc0 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.hamcrest.CoreMatchers.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.NavigableSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; +import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.regionserver.ReversedStoreScanner; +import org.apache.hadoop.hbase.regionserver.ScanInfo; +import org.apache.hadoop.hbase.regionserver.StoreScanner; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WAL; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestFromClientSideScanExcpetion { + + protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + + private static int SLAVES = 3; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 6000000); + conf.setClass(HConstants.REGION_IMPL, MyHRegion.class, HRegion.class); + conf.setBoolean("hbase.client.log.scanner.activity", true); + // We need more than one region server in this test + TEST_UTIL.startMiniCluster(SLAVES); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static AtomicBoolean ON = new AtomicBoolean(false); + private static AtomicLong REQ_COUNT = new AtomicLong(0); + private static AtomicBoolean IS_DO_NOT_RETRY = new AtomicBoolean(false); // whether to throw + // DNRIOE + private static AtomicBoolean THROW_ONCE = new AtomicBoolean(true); // whether to only throw once + + private static void reset() { + ON.set(false); + REQ_COUNT.set(0); + IS_DO_NOT_RETRY.set(false); + THROW_ONCE.set(true); + } + + private static void inject() { + ON.set(true); + } + + public static final class MyHRegion extends HRegion { + + @SuppressWarnings("deprecation") + public MyHRegion(Path tableDir, WAL wal, FileSystem fs, Configuration confParam, + RegionInfo regionInfo, TableDescriptor htd, RegionServerServices rsServices) { + super(tableDir, wal, fs, confParam, regionInfo, htd, rsServices); + } + + @Override + protected HStore instantiateHStore(ColumnFamilyDescriptor family) throws IOException { + return new MyHStore(this, family, conf); + } + } + + public static final class MyHStore extends HStore { + + public MyHStore(HRegion region, ColumnFamilyDescriptor family, Configuration confParam) + throws IOException { + super(region, family, confParam); + } + + @Override + protected KeyValueScanner createScanner(Scan scan, NavigableSet targetCols, long readPt) + throws IOException { + return scan.isReversed() + ? new ReversedStoreScanner(this, getScanInfo(), scan, targetCols, readPt) + : new MyStoreScanner(this, getScanInfo(), scan, targetCols, readPt); + } + } + + public static final class MyStoreScanner extends StoreScanner { + public MyStoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet columns, + long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + } + + @Override + protected List selectScannersFrom(HStore store, + List allScanners) { + List scanners = super.selectScannersFrom(store, allScanners); + List newScanners = new ArrayList<>(scanners.size()); + for (KeyValueScanner scanner : scanners) { + newScanners.add(new DelegatingKeyValueScanner(scanner) { + @Override + public boolean reseek(Cell key) throws IOException { + if (ON.get()) { + REQ_COUNT.incrementAndGet(); + if (!THROW_ONCE.get() || REQ_COUNT.get() == 1) { + if (IS_DO_NOT_RETRY.get()) { + throw new DoNotRetryIOException("Injected exception"); + } else { + throw new IOException("Injected exception"); + } + } + } + return super.reseek(key); + } + }); + } + return newScanners; + } + } + + /** + * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek leaving + * the server side RegionScanner to be in dirty state. The client has to ensure that the + * ClientScanner does not get an exception and also sees all the data. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testClientScannerIsResetWhenScanThrowsIOException() + throws IOException, InterruptedException { + reset(); + THROW_ONCE.set(true); // throw exceptions only once + TableName tableName = TableName.valueOf(name.getMethodName()); + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(tableName); + inject(); + int actualRowCount = TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + assertEquals(rowCount, actualRowCount); + } + assertTrue(REQ_COUNT.get() > 0); + } + + /** + * Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation + * is that the exception will bubble up to the client scanner instead of being retried. + */ + @Test + public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE() + throws IOException, InterruptedException { + reset(); + IS_DO_NOT_RETRY.set(true); + TableName tableName = TableName.valueOf(name.getMethodName()); + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(tableName); + inject(); + TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + fail("Should have thrown an exception"); + } catch (DoNotRetryIOException expected) { + // expected + } + assertTrue(REQ_COUNT.get() > 0); + } + + /** + * Tests the case where a coprocessor throws a regular IOException in the scan. The expectation is + * that the we will keep on retrying, but fail after the retries are exhausted instead of retrying + * indefinitely. + */ + @Test + public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE() + throws IOException, InterruptedException { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3); + TableName tableName = TableName.valueOf(name.getMethodName()); + reset(); + THROW_ONCE.set(false); // throw exceptions in every retry + try (Table t = TEST_UTIL.createTable(tableName, FAMILY)) { + TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(tableName); + inject(); + TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + fail("Should have thrown an exception"); + } catch (DoNotRetryIOException expected) { + assertThat(expected, instanceOf(ScannerResetException.class)); + // expected + } + assertTrue(REQ_COUNT.get() >= 3); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetionWithCoprocessor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetionWithCoprocessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetionWithCoprocessor.java new file mode 100644 index 0000000..3d50ec7 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetionWithCoprocessor.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + +/** + * Test all client operations with a coprocessor that just implements the default flush/compact/scan + * policy. + */ +@Category({ MediumTests.class, ClientTests.class }) +public class TestFromClientSideScanExcpetionWithCoprocessor + extends TestFromClientSideScanExcpetion { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + Configuration conf = TEST_UTIL.getConfiguration(); + conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, + MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName()); + TestFromClientSideScanExcpetion.setUpBeforeClass(); + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index ee94645..91af2b7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -82,11 +82,9 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { final AtomicInteger ctPreClose = new AtomicInteger(0); final AtomicInteger ctPostClose = new AtomicInteger(0); final AtomicInteger ctPreFlush = new AtomicInteger(0); - final AtomicInteger ctPreFlushScannerOpen = new AtomicInteger(0); final AtomicInteger ctPostFlush = new AtomicInteger(0); final AtomicInteger ctPreCompactSelect = new AtomicInteger(0); final AtomicInteger ctPostCompactSelect = new AtomicInteger(0); - final AtomicInteger ctPreCompactScanner = new AtomicInteger(0); final AtomicInteger ctPreCompact = new AtomicInteger(0); final AtomicInteger ctPostCompact = new AtomicInteger(0); final AtomicInteger ctPreGet = new AtomicInteger(0); @@ -114,7 +112,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { final AtomicInteger ctPreScannerClose = new AtomicInteger(0); final AtomicInteger ctPostScannerClose = new AtomicInteger(0); final AtomicInteger ctPreScannerOpen = new AtomicInteger(0); - final AtomicInteger ctPreStoreScannerOpen = new AtomicInteger(0); final AtomicInteger ctPostScannerOpen = new AtomicInteger(0); final AtomicInteger ctPreBulkLoadHFile = new AtomicInteger(0); final AtomicInteger ctPostBulkLoadHFile = new AtomicInteger(0); @@ -181,14 +178,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { } @Override - public InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - ctPreFlushScannerOpen.incrementAndGet(); - return s; - } - - @Override public void postFlush(ObserverContext c, Store store, StoreFile resultFile) throws IOException { ctPostFlush.incrementAndGet(); @@ -223,15 +212,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { } @Override - public InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - ctPreCompactScanner.incrementAndGet(); - return s; - } - - @Override public void postCompact(ObserverContext c, Store store, StoreFile resultFile, CompactionLifeCycleTracker tracker, CompactionRequest request) throws IOException { @@ -251,14 +231,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { } @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPt) - throws IOException { - ctPreStoreScannerOpen.incrementAndGet(); - return s; - } - - @Override public RegionScanner postScannerOpen(final ObserverContext c, final Scan scan, final RegionScanner s) throws IOException { @@ -830,10 +802,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return ctPreFlush.get(); } - public int getCtPreFlushScannerOpen() { - return ctPreFlushScannerOpen.get(); - } - public int getCtPostFlush() { return ctPostFlush.get(); } @@ -846,10 +814,6 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { return ctPostCompactSelect.get(); } - public int getCtPreCompactScanner() { - return ctPreCompactScanner.get(); - } - public int getCtPreCompact() { return ctPreCompact.get(); } http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 4448f9d..4d6bfec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -123,12 +124,15 @@ public class TestRegionObserverScannerOpenHook { } @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPt) - throws IOException { - scan.setFilter(new NoDataFilter()); - HStore hs = (HStore) store; - return new StoreScanner(hs, hs.getScanInfo(), scan, targetCols, readPt); + public void preGetOp(ObserverContext c, Get get, + List result) throws IOException { + c.bypass(); + } + + @Override + public RegionScanner preScannerOpen(ObserverContext c, Scan scan, + RegionScanner s) throws IOException { + return c.getEnvironment().getRegion().getScanner(scan.setFilter(new NoDataFilter())); } } @@ -152,10 +156,8 @@ public class TestRegionObserverScannerOpenHook { } @Override - public InternalScanner preFlushScannerOpen(ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - scanners.forEach(KeyValueScanner::close); + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner) throws IOException { return NO_DATA; } } @@ -171,12 +173,9 @@ public class TestRegionObserverScannerOpenHook { } @Override - public InternalScanner preCompactScannerOpen(ObserverContext c, - Store store, List scanners, ScanType scanType, - long earliestPutTs, InternalScanner s, CompactionLifeCycleTracker tracker, - CompactionRequest request, long readPoint) - throws IOException { - scanners.forEach(KeyValueScanner::close); + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { return NO_DATA; } } http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java new file mode 100644 index 0000000..ad733d1 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingInternalScanner.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.yetus.audience.InterfaceAudience; + +@InterfaceAudience.Private +public class DelegatingInternalScanner implements InternalScanner { + + protected final InternalScanner scanner; + + public DelegatingInternalScanner(InternalScanner scanner) { + this.scanner = scanner; + } + + @Override + public boolean next(List result, ScannerContext scannerContext) throws IOException { + return scanner.next(result, scannerContext); + } + + @Override + public void close() throws IOException { + scanner.close(); + } + +} http://git-wip-us.apache.org/repos/asf/hbase/blob/e804dd0b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index 2b98cf2..cdad850 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -19,12 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.List; -import java.util.NavigableSet; import java.util.Optional; -import java.util.OptionalInt; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor; import org.apache.hadoop.hbase.coprocessor.ObserverContext; @@ -35,11 +31,10 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; /** - * RegionObserver that just reimplements the default behavior, - * in order to validate that all the necessary APIs for this are public - * This observer is also used in {@link TestFromClientSideWithCoprocessor} and - * {@link TestCompactionWithCoprocessor} to make sure that a wide range - * of functionality still behaves as expected. + * RegionObserver that just reimplements the default behavior, in order to validate that all the + * necessary APIs for this are public This observer is also used in + * {@link TestFromClientSideWithCoprocessor} and {@link TestCompactionWithCoprocessor} to make sure + * that a wide range of functionality still behaves as expected. */ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver { @@ -48,49 +43,22 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver return Optional.of(this); } - /** - * Reimplement the default behavior - */ @Override - public InternalScanner preFlushScannerOpen(final ObserverContext c, - Store store, List scanners, InternalScanner s, long readPoint) - throws IOException { - HStore hs = (HStore) store; - ScanInfo oldSI = hs.getScanInfo(); - ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), - oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - return new StoreScanner(hs, scanInfo, OptionalInt.empty(), scanners, - ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner) throws IOException { + return new DelegatingInternalScanner(scanner); } - /** - * Reimplement the default behavior - */ @Override - public InternalScanner preCompactScannerOpen( - final ObserverContext c, Store store, - List scanners, ScanType scanType, long earliestPutTs, - InternalScanner s, CompactionLifeCycleTracker tracker, CompactionRequest request, - long readPoint) throws IOException { - HStore hs = (HStore) store; - // this demonstrates how to override the scanners default behavior - ScanInfo oldSI = hs.getScanInfo(); - ScanInfo scanInfo = new ScanInfo(oldSI.getConfiguration(), store.getColumnFamilyDescriptor(), - oldSI.getTtl(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); - return new StoreScanner(hs, scanInfo, OptionalInt.empty(), scanners, scanType, - store.getSmallestReadPoint(), earliestPutTs); + public InternalScanner preCompact(ObserverContext c, Store store, + InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker, + CompactionRequest request) throws IOException { + return new DelegatingInternalScanner(scanner); } @Override - public KeyValueScanner preStoreScannerOpen(ObserverContext c, - Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, long readPoint) - throws IOException { - HStore hs = (HStore) store; - Region r = c.getEnvironment().getRegion(); - return scan.isReversed() - ? new ReversedStoreScanner(hs, hs.getScanInfo(), scan, targetCols, - r.getReadPoint(scan.getIsolationLevel())) - : new StoreScanner(hs, hs.getScanInfo(), scan, targetCols, - r.getReadPoint(scan.getIsolationLevel())); + public RegionScanner preScannerOpen(ObserverContext c, Scan scan, + RegionScanner s) throws IOException { + return c.getEnvironment().getRegion().getScanner(scan); } }