Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2DFF5200BC7 for ; Fri, 11 Nov 2016 06:40:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 2C17B160B10; Fri, 11 Nov 2016 05:40:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 22FF9160B01 for ; Fri, 11 Nov 2016 06:40:20 +0100 (CET) Received: (qmail 48082 invoked by uid 500); 11 Nov 2016 05:40:20 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 48053 invoked by uid 99); 11 Nov 2016 05:40:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Nov 2016 05:40:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A76CFE00E5; Fri, 11 Nov 2016 05:40:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: anoopsamjohn@apache.org To: commits@hbase.apache.org Message-Id: <40195fc626d04c7cbb24577fd1678041@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-16962: Add readPoint to preCompactScannerOpen() and preFlushScannerOpen() API Date: Fri, 11 Nov 2016 05:40:19 +0000 (UTC) archived-at: Fri, 11 Nov 2016 05:40:22 -0000 Repository: hbase Updated Branches: refs/heads/branch-1 18b31fdd3 -> 44ab659b9 HBASE-16962: Add readPoint to preCompactScannerOpen() and preFlushScannerOpen() API Signed-off-by: anoopsamjohn Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44ab659b Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44ab659b Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44ab659b Branch: refs/heads/branch-1 Commit: 44ab659b933afed2df323b031181fbcb52c85b61 Parents: 18b31fd Author: thiruvel Authored: Wed Nov 9 11:02:41 2016 -0800 Committer: anoopsamjohn Committed: Fri Nov 11 11:09:55 2016 +0530 ---------------------------------------------------------------------- .../hbase/coprocessor/BaseRegionObserver.java | 14 +++++ .../hbase/coprocessor/RegionObserver.java | 60 ++++++++++++++++++-- .../regionserver/RegionCoprocessorHost.java | 13 +++-- .../hadoop/hbase/regionserver/StoreFlusher.java | 3 +- .../regionserver/compactions/Compactor.java | 10 ++-- 5 files changed, 85 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java index eb2fc28..a0a91bc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java @@ -103,6 +103,13 @@ public class BaseRegionObserver implements RegionObserver { } @Override + public InternalScanner preFlushScannerOpen(final ObserverContext c, + final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s, + final long readPoint) throws IOException { + return preFlushScannerOpen(c, store, memstoreScanner, s); + } + + @Override public void preFlush(ObserverContext e) throws IOException { } @@ -212,6 +219,13 @@ public class BaseRegionObserver implements RegionObserver { } @Override + public InternalScanner preCompactScannerOpen(ObserverContext c, + Store store, List scanners, ScanType scanType, long earliestPutTs, + InternalScanner s, CompactionRequest request, long readPoint) throws IOException { + return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s, request); + } + + @Override public void postCompact(ObserverContext e, final Store store, final StoreFile resultFile) throws IOException { } http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 42d5cdb..7cd1be5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -112,12 +112,35 @@ public interface RegionObserver extends Coprocessor { * @return the scanner to use during the flush. {@code null} if the default implementation * is to be used. * @throws IOException if an error occurred on the coprocessor + * @deprecated Use {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, + * InternalScanner, long)} */ + @Deprecated InternalScanner preFlushScannerOpen(final ObserverContext c, final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s) throws IOException; /** + * Called before a memstore is flushed to disk and prior to creating the scanner to read from + * the memstore. To override or modify how a memstore is flushed, + * implementing classes can return a new scanner to provide the KeyValues to be + * stored into the new {@code StoreFile} or null to perform the default processing. + * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * effect in this hook. + * @param c the environment provided by the region server + * @param store the store being flushed + * @param memstoreScanner the scanner for the memstore that is flushed + * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain + * @param readPoint the readpoint to create scanner + * @return the scanner to use during the flush. {@code null} if the default implementation + * is to be used. + * @throws IOException if an error occurred on the coprocessor + */ + InternalScanner preFlushScannerOpen(final ObserverContext c, + final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s, + final long readPoint) throws IOException; + + /** * Called before the memstore is flushed to disk. * @param c the environment provided by the region server * @throws IOException if an error occurred on the coprocessor @@ -283,7 +306,10 @@ public interface RegionObserver extends Coprocessor { * @return the scanner to use during compaction. {@code null} if the default implementation is to * be used. * @throws IOException if an error occurred on the coprocessor + * @deprecated Use {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, + * InternalScanner, CompactionRequest, long)} instead. */ + @Deprecated InternalScanner preCompactScannerOpen(final ObserverContext c, final Store store, List scanners, final ScanType scanType, final long earliestPutTs, final InternalScanner s, CompactionRequest request) @@ -304,12 +330,38 @@ public interface RegionObserver extends Coprocessor { * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store * files * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain + * @param request compaction request + * @param readPoint the readpoint to create scanner + * @return the scanner to use during compaction. {@code null} if the default implementation is to + * be used. + * @throws IOException if an error occurred on the coprocessor + */ + InternalScanner preCompactScannerOpen(final ObserverContext c, + final Store store, List scanners, final ScanType scanType, + final long earliestPutTs, final InternalScanner s, final CompactionRequest request, + final long readPoint) throws IOException; + + /** + * Called prior to writing the {@link StoreFile}s selected for compaction into a new + * {@code StoreFile} and prior to creating the scanner used to read the input files. To override + * or modify the compaction process, implementing classes can return a new scanner to provide the + * KeyValues to be stored into the new {@code StoreFile} or null to perform the default + * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no + * effect in this hook. + * @param c the environment provided by the region server + * @param store the store being compacted + * @param scanners the list {@link org.apache.hadoop.hbase.regionserver.StoreFileScanner}s + * to be read from + * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction + * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store + * files + * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain * @return the scanner to use during compaction. {@code null} if the default implementation is to * be used. * @throws IOException if an error occurred on the coprocessor * @deprecated Use * {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, - * InternalScanner, CompactionRequest)} instead. + * InternalScanner, CompactionRequest, long)} instead. */ @Deprecated InternalScanner preCompactScannerOpen(final ObserverContext c, @@ -983,9 +1035,9 @@ public interface RegionObserver extends Coprocessor { * Called before a store opens a new scanner. * This hook is called when a "user" scanner is opened. *

- * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner)} - * and {@link #preCompactScannerOpen(ObserverContext, - * Store, List, ScanType, long, InternalScanner)} + * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner, + * long)} and {@link #preCompactScannerOpen(ObserverContext, + * Store, List, ScanType, long, InternalScanner, CompactionRequest, long)} * to override scanners created for flushes or compactions, resp. *

* Call CoprocessorEnvironment#complete to skip any subsequent chained http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 28d2129..c564091 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -520,18 +520,19 @@ public class RegionCoprocessorHost /** * See - * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)} + * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, + * InternalScanner, CompactionRequest, long)} */ public InternalScanner preCompactScannerOpen(final Store store, final List scanners, final ScanType scanType, final long earliestPutTs, - final CompactionRequest request) throws IOException { + final CompactionRequest request, final long readPoint) throws IOException { return execOperationWithResult(null, coprocessors.isEmpty() ? null : new RegionOperationWithResult() { @Override public void call(RegionObserver oserver, ObserverContext ctx) throws IOException { setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, - earliestPutTs, getResult(), request)); + earliestPutTs, getResult(), request, readPoint)); } }); } @@ -649,16 +650,16 @@ public class RegionCoprocessorHost /** * See * {@link RegionObserver#preFlushScannerOpen(ObserverContext, - * Store, KeyValueScanner, InternalScanner)} + * Store, KeyValueScanner, InternalScanner, long)} */ public InternalScanner preFlushScannerOpen(final Store store, - final KeyValueScanner memstoreScanner) throws IOException { + final KeyValueScanner memstoreScanner, final long readPoint) throws IOException { return execOperationWithResult(null, coprocessors.isEmpty() ? null : new RegionOperationWithResult() { @Override public void call(RegionObserver oserver, ObserverContext ctx) throws IOException { - setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult())); + setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult(), readPoint)); } }); } http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 9b182a2..019e1cc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -83,7 +83,8 @@ abstract class StoreFlusher { long smallestReadPoint) throws IOException { InternalScanner scanner = null; if (store.getCoprocessorHost() != null) { - scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner); + scanner = store.getCoprocessorHost().preFlushScannerOpen(store, snapshotScanner, + smallestReadPoint); } if (scanner == null) { Scan scan = new Scan(); http://git-wip-us.apache.org/repos/asf/hbase/blob/44ab659b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index efb5fb2..d1ab800 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -286,7 +286,8 @@ public abstract class Compactor { try { /* Include deletes, unless we are doing a major compaction */ ScanType scanType = scannerFactory.getScanType(request); - scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user); + scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners, user, + smallestReadPoint); if (scanner == null) { scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint); } @@ -337,24 +338,25 @@ public abstract class Compactor { * @param earliestPutTs Earliest put ts. * @param scanners File scanners for compaction files. * @param user the User + * @param readPoint the read point to help create scanner by Coprocessor if required. * @return Scanner override by coprocessor; null if not overriding. */ protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, final ScanType scanType, final long earliestPutTs, final List scanners, - User user) throws IOException { + User user, final long readPoint) throws IOException { if (store.getCoprocessorHost() == null) { return null; } if (user == null) { return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, scanType, - earliestPutTs, request); + earliestPutTs, request, readPoint); } else { try { return user.getUGI().doAs(new PrivilegedExceptionAction() { @Override public InternalScanner run() throws Exception { return store.getCoprocessorHost().preCompactScannerOpen(store, scanners, - scanType, earliestPutTs, request); + scanType, earliestPutTs, request, readPoint); } }); } catch (InterruptedException ie) {