hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject hbase git commit: HBASE-14098 Allow dropping caches behind compactions
Date Thu, 13 Aug 2015 02:47:20 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 222102196 -> 34b706af4


HBASE-14098 Allow dropping caches behind compactions


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/34b706af
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/34b706af
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/34b706af

Branch: refs/heads/branch-1.2
Commit: 34b706af4d44ad7dff8ac5f35eec304d7dc0ccab
Parents: 2221021
Author: Elliott Clark <eclark@apache.org>
Authored: Wed Aug 12 14:32:48 2015 -0700
Committer: Elliott Clark <eclark@apache.org>
Committed: Wed Aug 12 19:43:16 2015 -0700

----------------------------------------------------------------------
 .../hbase/io/FSDataInputStreamWrapper.java      | 22 +++++++++++++----
 .../hadoop/hbase/io/hfile/CacheConfig.java      | 19 +++++++++++----
 .../org/apache/hadoop/hbase/io/hfile/HFile.java | 12 ++++++++++
 .../hbase/regionserver/DefaultStoreFlusher.java |  7 ++++--
 .../hadoop/hbase/regionserver/HStore.java       | 22 ++++++++++++-----
 .../apache/hadoop/hbase/regionserver/Store.java | 25 ++++++++++++++++++--
 .../hadoop/hbase/regionserver/StoreFile.java    | 24 ++++++++++++++-----
 .../hbase/regionserver/StoreFileInfo.java       | 11 +++++----
 .../hbase/regionserver/StoreFileScanner.java    | 11 +++++----
 .../hbase/regionserver/StripeStoreFlusher.java  |  6 ++++-
 .../regionserver/compactions/Compactor.java     | 10 ++++++--
 .../compactions/DefaultCompactor.java           | 16 +++++++++----
 .../compactions/StripeCompactor.java            |  8 ++++---
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java |  3 ++-
 .../hbase/regionserver/TestFSErrorsExposed.java |  2 +-
 .../regionserver/TestReversibleScanners.java    |  8 +++----
 .../hbase/regionserver/TestStripeCompactor.java |  3 ++-
 .../compactions/TestStripeCompactionPolicy.java |  3 ++-
 18 files changed, 160 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index 5950585..b06be6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.FileLink;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -76,14 +75,23 @@ public class FSDataInputStreamWrapper {
   private volatile int hbaseChecksumOffCount = -1;
 
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
-    this(fs, null, path);
+    this(fs, null, path, false);
+  }
+
+  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException
{
+    this(fs, null, path, dropBehind);
   }
 
   public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
-    this(fs, link, null);
+    this(fs, link, null, false);
+  }
+  public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+                                  boolean dropBehind) throws IOException {
+    this(fs, link, null, dropBehind);
   }
 
-  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException
{
+  private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+                                   Path path, boolean dropBehind) throws IOException {
     assert (path == null) != (link == null);
     this.path = path;
     this.link = link;
@@ -96,8 +104,14 @@ public class FSDataInputStreamWrapper {
     // Initially we are going to read the tail block. Open the reader w/FS checksum.
     this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
     this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+    try {
+      this.stream.setDropBehind(dropBehind);
+    } catch (Exception e) {
+      // Skipped.
+    }
   }
 
+
   /**
    * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
    * reads finish and before any other reads start (what happens in reality is we read the

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 26eb1da..ee2d001 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -131,6 +131,8 @@ public class CacheConfig {
   private static final boolean EXTERNAL_BLOCKCACHE_DEFAULT = false;
 
   private static final String EXTERNAL_BLOCKCACHE_CLASS_KEY="hbase.blockcache.external.class";
+  private static final String DROP_BEHIND_CACHE_COMPACTION_KEY="hbase.hfile.drop.behind.compaction";
+  private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = true;
 
   /**
    * Enum of all built in external block caches.
@@ -194,6 +196,8 @@ public class CacheConfig {
    */
   private boolean cacheDataInL1;
 
+  private final boolean dropBehindCompaction;
+
   /**
    * Create a cache configuration using the specified configuration object and
    * family descriptor.
@@ -218,7 +222,8 @@ public class CacheConfig {
         conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY,
             DEFAULT_PREFETCH_ON_OPEN) || family.isPrefetchBlocksOnOpen(),
         conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1,
-            HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1()
+            HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1) || family.isCacheDataInL1(),
+        conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
      );
   }
 
@@ -239,7 +244,8 @@ public class CacheConfig {
         conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
         conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN),
         conf.getBoolean(HColumnDescriptor.CACHE_DATA_IN_L1,
-          HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1)
+          HColumnDescriptor.DEFAULT_CACHE_DATA_IN_L1),
+        conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
      );
   }
 
@@ -264,7 +270,7 @@ public class CacheConfig {
       final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
       final boolean cacheBloomsOnWrite, final boolean evictOnClose,
       final boolean cacheDataCompressed, final boolean prefetchOnOpen,
-      final boolean cacheDataInL1) {
+      final boolean cacheDataInL1, final boolean dropBehindCompaction) {
     this.blockCache = blockCache;
     this.cacheDataOnRead = cacheDataOnRead;
     this.inMemory = inMemory;
@@ -275,6 +281,7 @@ public class CacheConfig {
     this.cacheDataCompressed = cacheDataCompressed;
     this.prefetchOnOpen = prefetchOnOpen;
     this.cacheDataInL1 = cacheDataInL1;
+    this.dropBehindCompaction = dropBehindCompaction;
     LOG.info(this);
   }
 
@@ -287,7 +294,7 @@ public class CacheConfig {
         cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
         cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
         cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen,
-        cacheConf.cacheDataInL1);
+        cacheConf.cacheDataInL1, cacheConf.dropBehindCompaction);
   }
 
   /**
@@ -314,6 +321,10 @@ public class CacheConfig {
     return isBlockCacheEnabled() && cacheDataOnRead;
   }
 
+  public boolean shouldDropBehindCompaction() {
+    return dropBehindCompaction;
+  }
+
   /**
    * Should we cache a block of a particular category? We always cache
    * important blocks such as index blocks, as long as the block cache is

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 0b06c33..6741957 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -248,6 +248,7 @@ public class HFile {
     protected KVComparator comparator = KeyValue.COMPARATOR;
     protected InetSocketAddress[] favoredNodes;
     private HFileContext fileContext;
+    protected boolean shouldDropBehind = false;
 
     WriterFactory(Configuration conf, CacheConfig cacheConf) {
       this.conf = conf;
@@ -285,6 +286,12 @@ public class HFile {
       return this;
     }
 
+    public WriterFactory withShouldDropCacheBehind(boolean shouldDropBehind) {
+      this.shouldDropBehind = shouldDropBehind;
+      return this;
+    }
+
+
     public Writer create() throws IOException {
       if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
         throw new AssertionError("Please specify exactly one of " +
@@ -292,6 +299,11 @@ public class HFile {
       }
       if (path != null) {
         ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
+        try {
+          ostream.setDropBehind(shouldDropBehind && cacheConf.shouldDropBehindCompaction());
+        } catch (UnsupportedOperationException uoe) {
+          LOG.debug("Unable to set drop behind on " + path, uoe);
+        }
       }
       return createWriter(fs, path, ostream,
                    comparator, fileContext);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 474a44a..da89129 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -63,8 +63,11 @@ public class DefaultStoreFlusher extends StoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = store.createWriterInTmp(
-            cellsCount, store.getFamily().getCompression(), false, true, snapshot.isTagsPresent());
+        writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
+            /* isCompaction = */ false,
+            /* includeMVCCReadpoint = */ true,
+            /* includesTags = */ snapshot.isTagsPresent(),
+            /* shouldDropBehind = */ false);
         writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
         IOException e = null;
         try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 0c6b2f0..e15db38 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -966,6 +966,15 @@ public class HStore implements Store {
     return sf;
   }
 
+  @Override
+  public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
+                                            boolean isCompaction, boolean includeMVCCReadpoint,
+                                            boolean includesTag)
+      throws IOException {
+    return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
+        includesTag, false);
+  }
+
   /*
    * @param maxKeyCount
    * @param compression Compression algorithm to use
@@ -976,7 +985,8 @@ public class HStore implements Store {
    */
   @Override
   public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
-      boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
+      boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
+      boolean shouldDropBehind)
   throws IOException {
     final CacheConfig writerCacheConf;
     if (isCompaction) {
@@ -1001,6 +1011,7 @@ public class HStore implements Store {
             .withMaxKeyCount(maxKeyCount)
             .withFavoredNodes(favoredNodes)
             .withFileContext(hFileContext)
+            .withShouldDropCacheBehind(shouldDropBehind)
             .build();
     return w;
   }
@@ -1102,9 +1113,8 @@ public class HStore implements Store {
     // TODO this used to get the store files in descending order,
     // but now we get them in ascending order, which I think is
     // actually more correct, since memstore get put at the end.
-    List<StoreFileScanner> sfScanners = StoreFileScanner
-      .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
-        readPt);
+    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
+        cacheBlocks, usePread, isCompaction, false, matcher, readPt);
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
     scanners.addAll(sfScanners);
@@ -1176,7 +1186,7 @@ public class HStore implements Store {
       CompactionThroughputController throughputController) throws IOException {
     assert compaction != null;
     List<StoreFile> sfs = null;
-    CompactionRequest cr = compaction.getRequest();;
+    CompactionRequest cr = compaction.getRequest();
     try {
       // Do all sanity checking in here if we have a valid CompactionRequest
       // because we need to clean up after it on the way out in a finally
@@ -2151,7 +2161,7 @@ public class HStore implements Store {
     return new StoreFlusherImpl(cacheFlushId);
   }
 
-  private class StoreFlusherImpl implements StoreFlushContext {
+  private final class StoreFlusherImpl implements StoreFlushContext {
 
     private long cacheFlushSeqNum;
     private MemStoreSnapshot snapshot;

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index da2cb10..5a13ba8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -159,11 +159,28 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
 
   FileSystem getFileSystem();
 
-  /*
+
+  /**
+   * @param maxKeyCount
+   * @param compression Compression algorithm to use
+   * @param isCompaction whether we are creating a new file in a compaction
+   * @param includeMVCCReadpoint whether we should out the MVCC readpoint
+   * @return Writer for a new StoreFile in the tmp dir.
+   */
+  StoreFile.Writer createWriterInTmp(
+      long maxKeyCount,
+      Compression.Algorithm compression,
+      boolean isCompaction,
+      boolean includeMVCCReadpoint,
+      boolean includesTags
+  ) throws IOException;
+
+  /**
    * @param maxKeyCount
    * @param compression Compression algorithm to use
    * @param isCompaction whether we are creating a new file in a compaction
    * @param includeMVCCReadpoint whether we should out the MVCC readpoint
+   * @param shouldDropBehind should the writer drop caches behind writes
    * @return Writer for a new StoreFile in the tmp dir.
    */
   StoreFile.Writer createWriterInTmp(
@@ -171,9 +188,13 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
     Compression.Algorithm compression,
     boolean isCompaction,
     boolean includeMVCCReadpoint,
-    boolean includesTags
+    boolean includesTags,
+    boolean shouldDropBehind
   ) throws IOException;
 
+
+
+
   // Compaction oriented methods
 
   boolean throttleCompaction(long compactionSize);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index dd86a5d..acd4233 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -247,8 +247,8 @@ public class StoreFile {
   }
 
   /**
-   * @return True if this is a StoreFile Reference; call after {@link #open()}
-   * else may get wrong answer.
+   * @return True if this is a StoreFile Reference; call
+   * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
    */
   public boolean isReference() {
     return this.fileInfo.isReference();
@@ -366,13 +366,13 @@ public class StoreFile {
    * @throws IOException
    * @see #closeReader(boolean)
    */
-  private Reader open() throws IOException {
+  private Reader open(boolean canUseDropBehind) throws IOException {
     if (this.reader != null) {
       throw new IllegalAccessError("Already open");
     }
 
     // Open the StoreFile.Reader
-    this.reader = fileInfo.open(this.fs, this.cacheConf);
+    this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
 
     // Load up indices and fileinfo. This also loads Bloom filter type.
     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@@ -462,14 +462,18 @@ public class StoreFile {
     return this.reader;
   }
 
+  public Reader createReader() throws IOException {
+    return createReader(false);
+  }
+
   /**
    * @return Reader for StoreFile. creates if necessary
    * @throws IOException
    */
-  public Reader createReader() throws IOException {
+  public Reader createReader(boolean canUseDropBehind) throws IOException {
     if (this.reader == null) {
       try {
-        this.reader = open();
+        this.reader = open(canUseDropBehind);
       } catch (IOException e) {
         try {
           this.closeReader(true);
@@ -546,6 +550,8 @@ public class StoreFile {
     private Path filePath;
     private InetSocketAddress[] favoredNodes;
     private HFileContext fileContext;
+    private boolean shouldDropCacheBehind = false;
+
     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
         FileSystem fs) {
       this.conf = conf;
@@ -611,6 +617,11 @@ public class StoreFile {
       this.fileContext = fileContext;
       return this;
     }
+
+    public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
+      this.shouldDropCacheBehind = shouldDropCacheBehind;
+      return this;
+    }
     /**
      * Create a store file writer. Client is responsible for closing file when
      * done. If metadata, add BEFORE closing using
@@ -1253,6 +1264,7 @@ public class StoreFile {
         case ROWCOL:
           key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
               colOffset, colLen);
+
           break;
 
         default:

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index 6516a3e..54f200f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -218,21 +218,24 @@ public class StoreFileInfo {
    * @return The StoreFile.Reader for the file
    */
   public StoreFile.Reader open(final FileSystem fs,
-      final CacheConfig cacheConf) throws IOException {
+      final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
     FSDataInputStreamWrapper in;
     FileStatus status;
 
+    final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
     if (this.link != null) {
       // HFileLink
-      in = new FSDataInputStreamWrapper(fs, this.link);
+      in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind);
       status = this.link.getFileStatus(fs);
     } else if (this.reference != null) {
       // HFile Reference
       Path referencePath = getReferredToFile(this.getPath());
-      in = new FSDataInputStreamWrapper(fs, referencePath);
+      in = new FSDataInputStreamWrapper(fs, referencePath,
+          doDropBehind);
       status = fs.getFileStatus(referencePath);
     } else {
-      in = new FSDataInputStreamWrapper(fs, this.getPath());
+      in = new FSDataInputStreamWrapper(fs, this.getPath(),
+          doDropBehind);
       status = fs.getFileStatus(initialPath);
     }
     long length = status.getLen();

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 961352d..dc22931 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -90,7 +90,7 @@ public class StoreFileScanner implements KeyValueScanner {
       boolean cacheBlocks,
       boolean usePread, long readPt) throws IOException {
     return getScannersForStoreFiles(files, cacheBlocks,
-                                   usePread, false, readPt);
+                                   usePread, false, false, readPt);
   }
 
   /**
@@ -98,9 +98,9 @@ public class StoreFileScanner implements KeyValueScanner {
    */
   public static List<StoreFileScanner> getScannersForStoreFiles(
       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
-      boolean isCompaction, long readPt) throws IOException {
+      boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
-        null, readPt);
+        useDropBehind, null, readPt);
   }
 
   /**
@@ -110,11 +110,12 @@ public class StoreFileScanner implements KeyValueScanner {
    */
   public static List<StoreFileScanner> getScannersForStoreFiles(
       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
-      boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
+      boolean isCompaction, boolean canUseDrop,
+      ScanQueryMatcher matcher, long readPt) throws IOException {
     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
         files.size());
     for (StoreFile file : files) {
-      StoreFile.Reader r = file.createReader();
+      StoreFile.Reader r = file.createReader(canUseDrop);
       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
           isCompaction, readPt);
       scanner.setScanQueryMatcher(matcher);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 136934c..37e7402 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -109,7 +109,11 @@ public class StripeStoreFlusher extends StoreFlusher {
       @Override
       public Writer createWriter() throws IOException {
         StoreFile.Writer writer = store.createWriterInTmp(
-            kvCount, store.getFamily().getCompression(), false, true, true);
+            kvCount, store.getFamily().getCompression(),
+            /* isCompaction = */ false,
+            /* includeMVCCReadpoint = */ true,
+            /* includesTags = */ true,
+            /* shouldDropBehind = */ false);
         writer.setTimeRangeTracker(tracker);
         return writer;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 2c34c70..a515b87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -185,8 +185,14 @@ public abstract class Compactor {
    * @return Scanners.
    */
   protected List<StoreFileScanner> createFileScanners(
-      final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException
{
-    return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
+      final Collection<StoreFile> filesToCompact,
+      long smallestReadPoint,
+      boolean useDropBehind) throws IOException {
+    return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
+        /* cache blocks = */ false,
+        /* use pread = */ false,
+        /* is compaction */ true,
+        /* use Drop Behind */ useDropBehind,
       smallestReadPoint);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index bc8dd01..ed441d3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -60,17 +60,19 @@ public class DefaultCompactor extends Compactor {
 
     List<StoreFileScanner> scanners;
     Collection<StoreFile> readersToClose;
-    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) {
+    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
       // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
       // HFileFiles, and their readers
       readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
       for (StoreFile f : request.getFiles()) {
         readersToClose.add(new StoreFile(f));
       }
-      scanners = createFileScanners(readersToClose, smallestReadPoint);
+      scanners = createFileScanners(readersToClose, smallestReadPoint,
+          store.throttleCompaction(request.getSize()));
     } else {
       readersToClose = Collections.emptyList();
-      scanners = createFileScanners(request.getFiles(), smallestReadPoint);
+      scanners = createFileScanners(request.getFiles(), smallestReadPoint,
+          store.throttleCompaction(request.getSize()));
     }
 
     StoreFile.Writer writer = null;
@@ -81,6 +83,7 @@ public class DefaultCompactor extends Compactor {
       InternalScanner scanner = null;
       try {
         /* Include deletes, unless we are doing a compaction of all files */
+
         ScanType scanType =
             request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES : ScanType.COMPACT_RETAIN_DELETES;
         scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
@@ -102,14 +105,17 @@ public class DefaultCompactor extends Compactor {
         // When all MVCC readpoints are 0, don't write them.
         // See HBASE-8166, HBASE-12600, and HBASE-13389.
         writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
-          fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0);
+          fd.maxMVCCReadpoint > 0, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
+
         boolean finished =
             performCompaction(scanner, writer, smallestReadPoint, cleanSeqId, throughputController);
+
+
         if (!finished) {
           writer.close();
           store.getFileSystem().delete(writer.getPath(), false);
           writer = null;
-          throw new InterruptedIOException( "Aborting compaction of store " + store +
+          throw new InterruptedIOException("Aborting compaction of store " + store +
               " in region " + store.getRegionInfo().getRegionNameAsString() +
               " because it was interrupted.");
          }

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 10e3cf0..f11c259 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -81,7 +81,7 @@ public class StripeCompactor extends Compactor {
       throughputController);
   }
 
-  private List<Path> compactInternal(StripeMultiFileWriter mw, CompactionRequest request,
+  private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest
request,
       byte[] majorRangeFromRow, byte[] majorRangeToRow,
       CompactionThroughputController throughputController) throws IOException {
     final Collection<StoreFile> filesToCompact = request.getFiles();
@@ -89,7 +89,8 @@ public class StripeCompactor extends Compactor {
     this.progress = new CompactionProgress(fd.maxKeyCount);
 
     long smallestReadPoint = getSmallestReadPoint();
-    List<StoreFileScanner> scanners = createFileScanners(filesToCompact, smallestReadPoint);
+    List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
+        smallestReadPoint, store.throttleCompaction(request.getSize()));
 
     boolean finished = false;
     InternalScanner scanner = null;
@@ -124,7 +125,8 @@ public class StripeCompactor extends Compactor {
         @Override
         public Writer createWriter() throws IOException {
           return store.createWriterInTmp(
-              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
+              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
+              store.throttleCompaction(request.getSize()));
         }
       };
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index a025e6c..d2bfa7e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -252,7 +252,8 @@ public class TestCacheOnWrite {
     cacheConf =
         new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
         cowType.shouldBeCached(BlockType.LEAF_INDEX),
-        cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false,
false);
+        cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData,
+            false, false, false);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
index 4e97738..b84f0d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
@@ -142,7 +142,7 @@ public class TestFSErrorsExposed {
       cacheConf, BloomType.NONE);
 
     List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
-        Collections.singletonList(sf), false, true, false,
+        Collections.singletonList(sf), false, true, false, false,
         // 0 is passed as readpoint because this test operates on StoreFile directly
         0);
     KeyValueScanner scanner = scanners.get(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index eecacbe..9ed5d97 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -104,14 +104,14 @@ public class TestReversibleScanners {
           TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
 
       List<StoreFileScanner> scanners = StoreFileScanner
-          .getScannersForStoreFiles(Collections.singletonList(sf), false, true,
-              false, Long.MAX_VALUE);
+          .getScannersForStoreFiles(Collections.singletonList(sf),
+              false, true, false, false, Long.MAX_VALUE);
       StoreFileScanner scanner = scanners.get(0);
       seekTestOfReversibleKeyValueScanner(scanner);
       for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
         LOG.info("Setting read point to " + readPoint);
         scanners = StoreFileScanner.getScannersForStoreFiles(
-            Collections.singletonList(sf), false, true, false, readPoint);
+            Collections.singletonList(sf), false, true, false, false, readPoint);
         seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
       }
     }
@@ -493,7 +493,7 @@ public class TestReversibleScanners {
       throws IOException {
     List<StoreFileScanner> fileScanners = StoreFileScanner
         .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
-            false, readPoint);
+            false, false, readPoint);
     List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
     List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
         fileScanners.size() + 1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
index 50ee131..110eade 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
@@ -195,7 +195,7 @@ public class TestStripeCompactor {
     when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
     when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
     when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
-        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+        anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
     when(store.getComparator()).thenReturn(new KVComparator());
 
     return new StripeCompactor(conf, store) {
@@ -226,6 +226,7 @@ public class TestStripeCompactor {
       .thenReturn(mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader()).thenReturn(r);
+    when(sf.createReader(anyBoolean())).thenReturn(r);
     return new CompactionRequest(Arrays.asList(sf));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/34b706af/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 5e62af1..a11bd70 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -724,6 +724,7 @@ public class TestStripeCompactionPolicy {
     when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
       mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
+    when(sf.createReader(anyBoolean())).thenReturn(r);
     when(sf.createReader()).thenReturn(r);
     return sf;
   }
@@ -747,7 +748,7 @@ public class TestStripeCompactionPolicy {
     when(store.getRegionInfo()).thenReturn(info);
     when(
       store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
-        anyBoolean(), anyBoolean())).thenAnswer(writers);
+        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
 
     Configuration conf = HBaseConfiguration.create();
     final Scanner scanner = new Scanner();


Mime
View raw message