Return-Path: X-Original-To: apmail-hbase-commits-archive@www.apache.org Delivered-To: apmail-hbase-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BE6C318106 for ; Thu, 13 Aug 2015 02:47:20 +0000 (UTC) Received: (qmail 87797 invoked by uid 500); 13 Aug 2015 02:47:20 -0000 Delivered-To: apmail-hbase-commits-archive@hbase.apache.org Received: (qmail 87754 invoked by uid 500); 13 Aug 2015 02:47:20 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 87745 invoked by uid 99); 13 Aug 2015 02:47:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Aug 2015 02:47:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 48C85E0332; Thu, 13 Aug 2015 02:47:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eclark@apache.org To: commits@hbase.apache.org Message-Id: <3b9a55a8981c44cf9fd39cc24469ba7b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-14098 Allow dropping caches behind compactions Date: Thu, 13 Aug 2015 02:47:20 +0000 (UTC) Repository: hbase Updated Branches: refs/heads/branch-1.2 222102196 -> 34b706af4 HBASE-14098 Allow dropping caches behind compactions Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/34b706af Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/34b706af Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/34b706af Branch: refs/heads/branch-1.2 Commit: 34b706af4d44ad7dff8ac5f35eec304d7dc0ccab Parents: 2221021 Author: Elliott Clark Authored: Wed Aug 12 14:32:48 2015 -0700 Committer: Elliott Clark Committed: Wed Aug 12 19:43:16 2015 -0700 ---------------------------------------------------------------------- .../hbase/io/FSDataInputStreamWrapper.java | 22 +++++++++++++---- .../hadoop/hbase/io/hfile/CacheConfig.java | 19 +++++++++++---- .../org/apache/hadoop/hbase/io/hfile/HFile.java | 12 ++++++++++ .../hbase/regionserver/DefaultStoreFlusher.java | 7 ++++-- .../hadoop/hbase/regionserver/HStore.java | 22 ++++++++++++----- .../apache/hadoop/hbase/regionserver/Store.java | 25 ++++++++++++++++++-- .../hadoop/hbase/regionserver/StoreFile.java | 24 ++++++++++++++----- .../hbase/regionserver/StoreFileInfo.java | 11 +++++---- .../hbase/regionserver/StoreFileScanner.java | 11 +++++---- .../hbase/regionserver/StripeStoreFlusher.java | 6 ++++- .../regionserver/compactions/Compactor.java | 10 ++++++-- .../compactions/DefaultCompactor.java | 16 +++++++++---- .../compactions/StripeCompactor.java | 8 ++++--- .../hadoop/hbase/io/hfile/TestCacheOnWrite.java | 3 ++- .../hbase/regionserver/TestFSErrorsExposed.java | 2 +- .../regionserver/TestReversibleScanners.java | 8 +++---- .../hbase/regionserver/TestStripeCompactor.java | 3 ++- .../compactions/TestStripeCompactionPolicy.java | 3 ++- 18 files changed, 160 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java index 5950585..b06be6b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java @@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.fs.HFileSystem; -import org.apache.hadoop.hbase.io.FileLink; import com.google.common.annotations.VisibleForTesting; @@ -76,14 +75,23 @@ public class FSDataInputStreamWrapper { private volatile int hbaseChecksumOffCount = -1; public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException { - this(fs, null, path); + this(fs, null, path, false); + } + + public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException { + this(fs, null, path, dropBehind); } public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException { - this(fs, link, null); + this(fs, link, null, false); + } + public FSDataInputStreamWrapper(FileSystem fs, FileLink link, + boolean dropBehind) throws IOException { + this(fs, link, null, dropBehind); } - private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException { + private FSDataInputStreamWrapper(FileSystem fs, FileLink link, + Path path, boolean dropBehind) throws IOException { assert (path == null) != (link == null); this.path = path; this.link = link; @@ -96,8 +104,14 @@ public class FSDataInputStreamWrapper { // Initially we are going to read the tail block. Open the reader w/FS checksum. this.useHBaseChecksumConfigured = this.useHBaseChecksum = false; this.stream = (link != null) ? link.open(hfs) : hfs.open(path); + try { + this.stream.setDropBehind(dropBehind); + } catch (Exception e) { + // Skipped. + } } + /** * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any * reads finish and before any other reads start (what happens in reality is we read the http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java index 26eb1da..ee2d001 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java @@ -131,6 +131,8 @@ public class CacheConfig { private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false; private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class"; + private static final String DROP_BEHIND_CACHE_COMPACTION_KEY="hbase.hfile.drop.behind.compaction"; + private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true; /** * Enum of all built in external block caches. @@ -194,6 +196,8 @@ public class CacheConfig { */ private boolean cacheDataInL1; + private final boolean dropBehindCompaction; + /** * Create a cache configuration using the specified configuration object and * family descriptor. @@ -218,7 +222,8 @@ public class CacheConfig { conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN) || family.isPrefetchBlocksOnOpen(), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, - HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1() + HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1(), + conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT) ); } @@ -239,7 +244,8 @@ public class CacheConfig { conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED), conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN), conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1, - HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) + HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1), + conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT) ); } @@ -264,7 +270,7 @@ public class CacheConfig { final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite, final boolean cacheBloomsOnWrite, final boolean evictOnClose, final boolean cacheDataCompressed, final boolean prefetchOnOpen, - final boolean cacheDataInL1) { + final boolean cacheDataInL1, final boolean dropBehindCompaction) { this.blockCache = blockCache; this.cacheDataOnRead = cacheDataOnRead; this.inMemory = inMemory; @@ -275,6 +281,7 @@ public class CacheConfig { this.cacheDataCompressed = cacheDataCompressed; this.prefetchOnOpen = prefetchOnOpen; this.cacheDataInL1 = cacheDataInL1; + this.dropBehindCompaction = dropBehindCompaction; LOG.info(this); } @@ -287,7 +294,7 @@ public class CacheConfig { cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite, cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose, cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen, - cacheConf.cacheDataInL1); + cacheConf.cacheDataInL1, cacheConf.dropBehindCompaction); } /** @@ -314,6 +321,10 @@ public class CacheConfig { return isBlockCacheEnabled() && cacheDataOnRead; } + public boolean shouldDropBehindCompaction() { + return dropBehindCompaction; + } + /** * Should we cache a block of a particular category? We always cache * important blocks such as index blocks, as long as the block cache is http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java index 0b06c33..6741957 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java @@ -248,6 +248,7 @@ public class HFile { protected KVComparator comparator = KeyValue.COMPARATOR; protected InetSocketAddress[] favoredNodes; private HFileContext fileContext; + protected boolean shouldDropBehind = false; WriterFactory(Configuration conf, CacheConfig cacheConf) { this.conf = conf; @@ -285,6 +286,12 @@ public class HFile { return this; } + public WriterFactory withShouldDropCacheBehind(boolean shouldDropBehind) { + this.shouldDropBehind = shouldDropBehind; + return this; + } + + public Writer create() throws IOException { if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) { throw new AssertionError("Please specify exactly one of " + @@ -292,6 +299,11 @@ public class HFile { } if (path != null) { ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes); + try { + ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction()); + } catch (UnsupportedOperationException uoe) { + LOG.debug("Unable to set drop behind on " + path, uoe); + } } return createWriter(fs, path, ostream, comparator, fileContext); http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 474a44a..da89129 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -63,8 +63,11 @@ public class DefaultStoreFlusher extends StoreFlusher { synchronized (flushLock) { status.setStatus("Flushing " + store + ": creating writer"); // Write the map out to the disk - writer = store.createWriterInTmp( - cellsCount, store.getFamily().getCompression(), false, true, snapshot.isTagsPresent()); + writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(), + /* isCompaction = */ false, + /* includeMVCCReadpoint = */ true, + /* includesTags = */ snapshot.isTagsPresent(), + /* shouldDropBehind = */ false); writer.setTimeRangeTracker(snapshot.getTimeRangeTracker()); IOException e = null; try { http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 0c6b2f0..e15db38 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -966,6 +966,15 @@ public class HStore implements Store { return sf; } + @Override + public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, + boolean isCompaction, boolean includeMVCCReadpoint, + boolean includesTag) + throws IOException { + return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint, + includesTag, false); + } + /* * @param maxKeyCount * @param compression Compression algorithm to use @@ -976,7 +985,8 @@ public class HStore implements Store { */ @Override public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression, - boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag) + boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag, + boolean shouldDropBehind) throws IOException { final CacheConfig writerCacheConf; if (isCompaction) { @@ -1001,6 +1011,7 @@ public class HStore implements Store { .withMaxKeyCount(maxKeyCount) .withFavoredNodes(favoredNodes) .withFileContext(hFileContext) + .withShouldDropCacheBehind(shouldDropBehind) .build(); return w; } @@ -1102,9 +1113,8 @@ public class HStore implements Store { // TODO this used to get the store files in descending order, // but now we get them in ascending order, which I think is // actually more correct, since memstore get put at the end. - List sfScanners = StoreFileScanner - .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher, - readPt); + List sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan, + cacheBlocks, usePread, isCompaction, false, matcher, readPt); List scanners = new ArrayList(sfScanners.size()+1); scanners.addAll(sfScanners); @@ -1176,7 +1186,7 @@ public class HStore implements Store { CompactionThroughputController throughputController) throws IOException { assert compaction != null; List sfs = null; - CompactionRequest cr = compaction.getRequest();; + CompactionRequest cr = compaction.getRequest(); try { // Do all sanity checking in here if we have a valid CompactionRequest // because we need to clean up after it on the way out in a finally @@ -2151,7 +2161,7 @@ public class HStore implements Store { return new StoreFlusherImpl(cacheFlushId); } - private class StoreFlusherImpl implements StoreFlushContext { + private final class StoreFlusherImpl implements StoreFlushContext { private long cacheFlushSeqNum; private MemStoreSnapshot snapshot; http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index da2cb10..5a13ba8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -159,11 +159,28 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf FileSystem getFileSystem(); - /* + + /** + * @param maxKeyCount + * @param compression Compression algorithm to use + * @param isCompaction whether we are creating a new file in a compaction + * @param includeMVCCReadpoint whether we should out the MVCC readpoint + * @return Writer for a new StoreFile in the tmp dir. + */ + StoreFile.Writer createWriterInTmp( + long maxKeyCount, + Compression.Algorithm compression, + boolean isCompaction, + boolean includeMVCCReadpoint, + boolean includesTags + ) throws IOException; + + /** * @param maxKeyCount * @param compression Compression algorithm to use * @param isCompaction whether we are creating a new file in a compaction * @param includeMVCCReadpoint whether we should out the MVCC readpoint + * @param shouldDropBehind should the writer drop caches behind writes * @return Writer for a new StoreFile in the tmp dir. */ StoreFile.Writer createWriterInTmp( @@ -171,9 +188,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf Compression.Algorithm compression, boolean isCompaction, boolean includeMVCCReadpoint, - boolean includesTags + boolean includesTags, + boolean shouldDropBehind ) throws IOException; + + + // Compaction oriented methods boolean throttleCompaction(long compactionSize); http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index dd86a5d..acd4233 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -247,8 +247,8 @@ public class StoreFile { } /** - * @return True if this is a StoreFile Reference; call after {@link #open()} - * else may get wrong answer. + * @return True if this is a StoreFile Reference; call + * after {@link #open(boolean canUseDropBehind)} else may get wrong answer. */ public boolean isReference() { return this.fileInfo.isReference(); @@ -366,13 +366,13 @@ public class StoreFile { * @throws IOException * @see #closeReader(boolean) */ - private Reader open() throws IOException { + private Reader open(boolean canUseDropBehind) throws IOException { if (this.reader != null) { throw new IllegalAccessError("Already open"); } // Open the StoreFile.Reader - this.reader = fileInfo.open(this.fs, this.cacheConf); + this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind); // Load up indices and fileinfo. This also loads Bloom filter type. metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo()); @@ -462,14 +462,18 @@ public class StoreFile { return this.reader; } + public Reader createReader() throws IOException { + return createReader(false); + } + /** * @return Reader for StoreFile. creates if necessary * @throws IOException */ - public Reader createReader() throws IOException { + public Reader createReader(boolean canUseDropBehind) throws IOException { if (this.reader == null) { try { - this.reader = open(); + this.reader = open(canUseDropBehind); } catch (IOException e) { try { this.closeReader(true); @@ -546,6 +550,8 @@ public class StoreFile { private Path filePath; private InetSocketAddress[] favoredNodes; private HFileContext fileContext; + private boolean shouldDropCacheBehind = false; + public WriterBuilder(Configuration conf, CacheConfig cacheConf, FileSystem fs) { this.conf = conf; @@ -611,6 +617,11 @@ public class StoreFile { this.fileContext = fileContext; return this; } + + public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) { + this.shouldDropCacheBehind = shouldDropCacheBehind; + return this; + } /** * Create a store file writer. Client is responsible for closing file when * done. If metadata, add BEFORE closing using @@ -1253,6 +1264,7 @@ public class StoreFile { case ROWCOL: key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col, colOffset, colLen); + break; default: http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index 6516a3e..54f200f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -218,21 +218,24 @@ public class StoreFileInfo { * @return The StoreFile.Reader for the file */ public StoreFile.Reader open(final FileSystem fs, - final CacheConfig cacheConf) throws IOException { + final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException { FSDataInputStreamWrapper in; FileStatus status; + final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction(); if (this.link != null) { // HFileLink - in = new FSDataInputStreamWrapper(fs, this.link); + in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind); status = this.link.getFileStatus(fs); } else if (this.reference != null) { // HFile Reference Path referencePath = getReferredToFile(this.getPath()); - in = new FSDataInputStreamWrapper(fs, referencePath); + in = new FSDataInputStreamWrapper(fs, referencePath, + doDropBehind); status = fs.getFileStatus(referencePath); } else { - in = new FSDataInputStreamWrapper(fs, this.getPath()); + in = new FSDataInputStreamWrapper(fs, this.getPath(), + doDropBehind); status = fs.getFileStatus(initialPath); } long length = status.getLen(); http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 961352d..dc22931 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -90,7 +90,7 @@ public class StoreFileScanner implements KeyValueScanner { boolean cacheBlocks, boolean usePread, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, - usePread, false, readPt); + usePread, false, false, readPt); } /** @@ -98,9 +98,9 @@ public class StoreFileScanner implements KeyValueScanner { */ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, long readPt) throws IOException { + boolean isCompaction, boolean useDropBehind, long readPt) throws IOException { return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, - null, readPt); + useDropBehind, null, readPt); } /** @@ -110,11 +110,12 @@ public class StoreFileScanner implements KeyValueScanner { */ public static List getScannersForStoreFiles( Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException { + boolean isCompaction, boolean canUseDrop, + ScanQueryMatcher matcher, long readPt) throws IOException { List scanners = new ArrayList( files.size()); for (StoreFile file : files) { - StoreFile.Reader r = file.createReader(); + StoreFile.Reader r = file.createReader(canUseDrop); StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread, isCompaction, readPt); scanner.setScanQueryMatcher(matcher); http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index 136934c..37e7402 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -109,7 +109,11 @@ public class StripeStoreFlusher extends StoreFlusher { @Override public Writer createWriter() throws IOException { StoreFile.Writer writer = store.createWriterInTmp( - kvCount, store.getFamily().getCompression(), false, true, true); + kvCount, store.getFamily().getCompression(), + /* isCompaction = */ false, + /* includeMVCCReadpoint = */ true, + /* includesTags = */ true, + /* shouldDropBehind = */ false); writer.setTimeRangeTracker(tracker); return writer; } http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java index 2c34c70..a515b87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java @@ -185,8 +185,14 @@ public abstract class Compactor { * @return Scanners. */ protected List createFileScanners( - final Collection filesToCompact, long smallestReadPoint) throws IOException { - return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true, + final Collection filesToCompact, + long smallestReadPoint, + boolean useDropBehind) throws IOException { + return StoreFileScanner.getScannersForStoreFiles(filesToCompact, + /* cache blocks = */ false, + /* use pread = */ false, + /* is compaction */ true, + /* use Drop Behind */ useDropBehind, smallestReadPoint); } http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java index bc8dd01..ed441d3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java @@ -60,17 +60,19 @@ public class DefaultCompactor extends Compactor { List scanners; Collection readersToClose; - if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) { + if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) { // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles, // HFileFiles, and their readers readersToClose = new ArrayList(request.getFiles().size()); for (StoreFile f : request.getFiles()) { readersToClose.add(new StoreFile(f)); } - scanners = createFileScanners(readersToClose, smallestReadPoint); + scanners = createFileScanners(readersToClose, smallestReadPoint, + store.throttleCompaction(request.getSize())); } else { readersToClose = Collections.emptyList(); - scanners = createFileScanners(request.getFiles(), smallestReadPoint); + scanners = createFileScanners(request.getFiles(), smallestReadPoint, + store.throttleCompaction(request.getSize())); } StoreFile.Writer writer = null; @@ -81,6 +83,7 @@ public class DefaultCompactor extends Compactor { InternalScanner scanner = null; try { /* Include deletes, unless we are doing a compaction of all files */ + ScanType scanType = request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES; scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners); @@ -102,14 +105,17 @@ public class DefaultCompactor extends Compactor { // When all MVCC readpoints are 0, don't write them. // See HBASE-8166, HBASE-12600, and HBASE-13389. writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true, - fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0); + fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize())); + boolean finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController); + + if (!finished) { writer.close(); store.getFileSystem().delete(writer.getPath(), false); writer = null; - throw new InterruptedIOException( "Aborting compaction of store " + store + + throw new InterruptedIOException("Aborting compaction of store " + store + " in region " + store.getRegionInfo().getRegionNameAsString() + " because it was interrupted."); } http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java index 10e3cf0..f11c259 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java @@ -81,7 +81,7 @@ public class StripeCompactor extends Compactor { throughputController); } - private List compactInternal(StripeMultiFileWriter mw, CompactionRequest request, + private List compactInternal(StripeMultiFileWriter mw, final CompactionRequest request, byte[] majorRangeFromRow, byte[] majorRangeToRow, CompactionThroughputController throughputController) throws IOException { final Collection filesToCompact = request.getFiles(); @@ -89,7 +89,8 @@ public class StripeCompactor extends Compactor { this.progress = new CompactionProgress(fd.maxKeyCount); long smallestReadPoint = getSmallestReadPoint(); - List scanners = createFileScanners(filesToCompact, smallestReadPoint); + List scanners = createFileScanners(filesToCompact, + smallestReadPoint, store.throttleCompaction(request.getSize())); boolean finished = false; InternalScanner scanner = null; @@ -124,7 +125,8 @@ public class StripeCompactor extends Compactor { @Override public Writer createWriter() throws IOException { return store.createWriterInTmp( - fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0); + fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0, + store.throttleCompaction(request.getSize())); } }; http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java index a025e6c..d2bfa7e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java @@ -252,7 +252,8 @@ public class TestCacheOnWrite { cacheConf = new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA), cowType.shouldBeCached(BlockType.LEAF_INDEX), - cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false, false); + cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, + false, false, false); } @After http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java index 4e97738..b84f0d1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java @@ -142,7 +142,7 @@ public class TestFSErrorsExposed { cacheConf, BloomType.NONE); List scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true, false, + Collections.singletonList(sf), false, true, false, false, // 0 is passed as readpoint because this test operates on StoreFile directly 0); KeyValueScanner scanner = scanners.get(0); http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index eecacbe..9ed5d97 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -104,14 +104,14 @@ public class TestReversibleScanners { TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE); List scanners = StoreFileScanner - .getScannersForStoreFiles(Collections.singletonList(sf), false, true, - false, Long.MAX_VALUE); + .getScannersForStoreFiles(Collections.singletonList(sf), + false, true, false, false, Long.MAX_VALUE); StoreFileScanner scanner = scanners.get(0); seekTestOfReversibleKeyValueScanner(scanner); for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) { LOG.info("Setting read point to " + readPoint); scanners = StoreFileScanner.getScannersForStoreFiles( - Collections.singletonList(sf), false, true, false, readPoint); + Collections.singletonList(sf), false, true, false, false, readPoint); seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint); } } @@ -493,7 +493,7 @@ public class TestReversibleScanners { throws IOException { List fileScanners = StoreFileScanner .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true, - false, readPoint); + false, false, readPoint); List memScanners = memstore.getScanners(readPoint); List scanners = new ArrayList( fileScanners.size() + 1); http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index 50ee131..110eade 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -195,7 +195,7 @@ public class TestStripeCompactor { when(store.getFileSystem()).thenReturn(mock(FileSystem.class)); when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME)); when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), - anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); when(store.getComparator()).thenReturn(new KVComparator()); return new StripeCompactor(conf, store) { @@ -226,6 +226,7 @@ public class TestStripeCompactor { .thenReturn(mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); when(sf.createReader()).thenReturn(r); + when(sf.createReader(anyBoolean())).thenReturn(r); return new CompactionRequest(Arrays.asList(sf)); } http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index 5e62af1..a11bd70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -724,6 +724,7 @@ public class TestStripeCompactionPolicy { when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn( mock(StoreFileScanner.class)); when(sf.getReader()).thenReturn(r); + when(sf.createReader(anyBoolean())).thenReturn(r); when(sf.createReader()).thenReturn(r); return sf; } @@ -747,7 +748,7 @@ public class TestStripeCompactionPolicy { when(store.getRegionInfo()).thenReturn(info); when( store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(), - anyBoolean(), anyBoolean())).thenAnswer(writers); + anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers); Configuration conf = HBaseConfiguration.create(); final Scanner scanner = new Scanner();