hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [3/3] hbase git commit: Backport HBASE-14098 (Allow dropping caches behind compactions) to 0.98 (Liu Shaohui)
Date Thu, 24 Sep 2015 17:56:28 GMT
Backport HBASE-14098 (Allow dropping caches behind compactions) to 0.98 (Liu Shaohui)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/672d74e7
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/672d74e7
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/672d74e7

Branch: refs/heads/0.98
Commit: 672d74e7ba8c6caaedf3a549c30db5e1cbb4289b
Parents: 06f1270
Author: Andrew Purtell <apurtell@apache.org>
Authored: Thu Sep 24 09:52:21 2015 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Thu Sep 24 09:52:21 2015 -0700

----------------------------------------------------------------------
 .../hbase/io/FSDataInputStreamWrapper.java      | 38 +++++++++++++++++---
 .../hadoop/hbase/io/hfile/CacheConfig.java      | 26 +++++++++++---
 .../org/apache/hadoop/hbase/io/hfile/HFile.java | 25 +++++++++++++
 .../hbase/regionserver/DefaultStoreFlusher.java |  7 ++--
 .../hadoop/hbase/regionserver/HStore.java       | 22 ++++++++----
 .../apache/hadoop/hbase/regionserver/Store.java | 25 +++++++++++--
 .../hadoop/hbase/regionserver/StoreFile.java    | 24 +++++++++----
 .../hbase/regionserver/StoreFileInfo.java       | 12 ++++---
 .../hbase/regionserver/StoreFileScanner.java    | 29 +++++++++++++--
 .../hbase/regionserver/StripeStoreFlusher.java  |  6 +++-
 .../regionserver/compactions/Compactor.java     | 10 ++++--
 .../compactions/DefaultCompactor.java           | 14 +++++---
 .../compactions/StripeCompactor.java            |  8 +++--
 .../hadoop/hbase/io/hfile/TestCacheOnWrite.java |  2 +-
 .../hbase/regionserver/TestFSErrorsExposed.java |  2 +-
 .../regionserver/TestReversibleScanners.java    |  8 ++---
 .../hbase/regionserver/TestStripeCompactor.java |  3 +-
 .../compactions/TestStripeCompactionPolicy.java |  3 +-
 18 files changed, 212 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
index 5950585..e36ca8e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FSDataInputStreamWrapper.java
@@ -18,12 +18,14 @@
 package org.apache.hadoop.hbase.io;
 
 import java.io.IOException;
+import java.lang.reflect.Method;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.fs.HFileSystem;
-import org.apache.hadoop.hbase.io.FileLink;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -33,6 +35,8 @@ import com.google.common.annotations.VisibleForTesting;
  * see method comments.
  */
 public class FSDataInputStreamWrapper {
+  static final Log LOG = LogFactory.getLog(FSDataInputStreamWrapper.class);
+
   private final HFileSystem hfs;
   private final Path path;
   private final FileLink link;
@@ -76,14 +80,23 @@ public class FSDataInputStreamWrapper {
   private volatile int hbaseChecksumOffCount = -1;
 
   public FSDataInputStreamWrapper(FileSystem fs, Path path) throws IOException {
-    this(fs, null, path);
+    this(fs, null, path, false);
+  }
+
+  public FSDataInputStreamWrapper(FileSystem fs, Path path, boolean dropBehind) throws IOException
{
+    this(fs, null, path, dropBehind);
   }
 
   public FSDataInputStreamWrapper(FileSystem fs, FileLink link) throws IOException {
-    this(fs, link, null);
+    this(fs, link, null, false);
+  }
+  public FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+                                  boolean dropBehind) throws IOException {
+    this(fs, link, null, dropBehind);
   }
 
-  private FSDataInputStreamWrapper(FileSystem fs, FileLink link, Path path) throws IOException
{
+  private FSDataInputStreamWrapper(FileSystem fs, FileLink link,
+                                   Path path, boolean dropBehind) throws IOException {
     assert (path == null) != (link == null);
     this.path = path;
     this.link = link;
@@ -96,8 +109,25 @@ public class FSDataInputStreamWrapper {
     // Initially we are going to read the tail block. Open the reader w/FS checksum.
     this.useHBaseChecksumConfigured = this.useHBaseChecksum = false;
     this.stream = (link != null) ? link.open(hfs) : hfs.open(path);
+    try {
+      Class<? extends FSDataInputStream> inputStreamClass = this.stream.getClass();
+      try {
+        Method m = inputStreamClass.getDeclaredMethod("setDropBehind",
+          new Class[] { boolean.class });
+        m.invoke(stream, new Object[] { dropBehind });
+      } catch (NoSuchMethodException e) {
+        // Not supported, we can just ignore it
+      } catch (Exception e) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Failed to invoke input stream's setDropBehind method, continuing");
+        }
+      }
+    } catch (Exception e) {
+      // Skipped.
+    }
   }
 
+
   /**
    * Prepares the streams for block reader. NOT THREAD SAFE. Must be called once, after any
    * reads finish and before any other reads start (what happens in reality is we read the

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
index 376df07..d156521 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/CacheConfig.java
@@ -116,6 +116,10 @@ public class CacheConfig {
    */
   public static final String BLOCKCACHE_BLOCKSIZE_KEY = "hbase.offheapcache.minblocksize";
 
+  private static final String DROP_BEHIND_CACHE_COMPACTION_KEY =
+      "hbase.hfile.drop.behind.compaction";
+  private static final boolean DROP_BEHIND_CACHE_COMPACTION_DEFAULT = false;
+
   // Defaults
 
   public static final boolean DEFAULT_CACHE_DATA_ON_READ = true;
@@ -157,6 +161,9 @@ public class CacheConfig {
   /** Whether data blocks should be prefetched into the cache */
   private final boolean prefetchOnOpen;
 
+  /** Whether or not to drop file data from the OS blockcache behind a compaction */
+  private final boolean dropBehindCompaction;
+
   /**
    * Create a cache configuration using the specified configuration object and
    * family descriptor.
@@ -179,7 +186,8 @@ public class CacheConfig {
             DEFAULT_EVICT_ON_CLOSE) || family.shouldEvictBlocksOnClose(),
         conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
         conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY,
-            DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen()
+            DEFAULT_PREFETCH_ON_OPEN) || family.shouldPrefetchBlocksOnOpen(),
+        conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
      );
   }
 
@@ -198,8 +206,9 @@ public class CacheConfig {
         conf.getBoolean(CACHE_BLOOM_BLOCKS_ON_WRITE_KEY, DEFAULT_CACHE_BLOOMS_ON_WRITE),
         conf.getBoolean(EVICT_BLOCKS_ON_CLOSE_KEY, DEFAULT_EVICT_ON_CLOSE),
         conf.getBoolean(CACHE_DATA_BLOCKS_COMPRESSED_KEY, DEFAULT_CACHE_DATA_COMPRESSED),
-        conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN)
-    );
+        conf.getBoolean(PREFETCH_BLOCKS_ON_OPEN_KEY, DEFAULT_PREFETCH_ON_OPEN),
+        conf.getBoolean(DROP_BEHIND_CACHE_COMPACTION_KEY,DROP_BEHIND_CACHE_COMPACTION_DEFAULT)
+     );
   }
 
   /**
@@ -219,7 +228,8 @@ public class CacheConfig {
       final boolean cacheDataOnRead, final boolean inMemory,
       final boolean cacheDataOnWrite, final boolean cacheIndexesOnWrite,
       final boolean cacheBloomsOnWrite, final boolean evictOnClose,
-      final boolean cacheDataCompressed, final boolean prefetchOnOpen) {
+      final boolean cacheDataCompressed, final boolean prefetchOnOpen,
+      final boolean dropBehindCompaction) {
     this.blockCache = blockCache;
     this.cacheDataOnRead = cacheDataOnRead;
     this.inMemory = inMemory;
@@ -229,6 +239,7 @@ public class CacheConfig {
     this.evictOnClose = evictOnClose;
     this.cacheDataCompressed = cacheDataCompressed;
     this.prefetchOnOpen = prefetchOnOpen;
+    this.dropBehindCompaction = dropBehindCompaction;
   }
 
   /**
@@ -239,7 +250,8 @@ public class CacheConfig {
     this(cacheConf.blockCache, cacheConf.cacheDataOnRead, cacheConf.inMemory,
         cacheConf.cacheDataOnWrite, cacheConf.cacheIndexesOnWrite,
         cacheConf.cacheBloomsOnWrite, cacheConf.evictOnClose,
-        cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen);
+        cacheConf.cacheDataCompressed, cacheConf.prefetchOnOpen,
+        cacheConf.dropBehindCompaction);
   }
 
   /**
@@ -265,6 +277,10 @@ public class CacheConfig {
     return isBlockCacheEnabled() && cacheDataOnRead;
   }
 
+  public boolean shouldDropBehindCompaction() {
+    return dropBehindCompaction;
+  }
+
   /**
    * Should we cache a block of a particular category? We always cache
    * important blocks such as index blocks, as long as the block cache is

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
index 748d61d..fbf28ff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
@@ -25,6 +25,7 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.SequenceInputStream;
+import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -256,6 +257,7 @@ public class HFile {
     protected KVComparator comparator = KeyValue.COMPARATOR;
     protected InetSocketAddress[] favoredNodes;
     private HFileContext fileContext;
+    protected boolean shouldDropBehind = false;
 
     WriterFactory(Configuration conf, CacheConfig cacheConf) {
       this.conf = conf;
@@ -293,6 +295,12 @@ public class HFile {
       return this;
     }
 
+    public WriterFactory withShouldDropCacheBehind(boolean shouldDropBehind) {
+      this.shouldDropBehind = shouldDropBehind;
+      return this;
+    }
+
+
     public Writer create() throws IOException {
       if ((path != null ? 1 : 0) + (ostream != null ? 1 : 0) != 1) {
         throw new AssertionError("Please specify exactly one of " +
@@ -300,6 +308,23 @@ public class HFile {
       }
       if (path != null) {
         ostream = AbstractHFileWriter.createOutputStream(conf, fs, path, favoredNodes);
+        try {
+          Class<? extends FSDataOutputStream> outStreamClass = ostream.getClass();
+          try {
+            Method m = outStreamClass.getDeclaredMethod("setDropBehind",
+              new Class[]{ boolean.class });
+            m.invoke(ostream, new Object[] {
+              shouldDropBehind && cacheConf.shouldDropBehindCompaction() });
+          } catch (NoSuchMethodException e) {
+            // Not supported, we can just ignore it
+          } catch (Exception e) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Failed to invoke output stream's setDropBehind method, continuing");
+            }
+          }
+        } catch (UnsupportedOperationException uoe) {
+          LOG.debug("Unable to set drop behind on " + path, uoe);
+        }
       }
       return createWriter(fs, path, ostream,
                    comparator, fileContext);

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index a13de99..9dd9998 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -67,8 +67,11 @@ public class DefaultStoreFlusher extends StoreFlusher {
       synchronized (flushLock) {
         status.setStatus("Flushing " + store + ": creating writer");
         // Write the map out to the disk
-        writer = store.createWriterInTmp(
-            snapshot.size(), store.getFamily().getCompression(), false, true, true);
+        writer = store.createWriterInTmp(snapshot.size(), store.getFamily().getCompression(),
+            /* isCompaction = */ false,
+            /* includeMVCCReadpoint = */ true,
+            /* includesTags = */ true,
+            /* shouldDropBehind = */ false);
         writer.setTimeRangeTracker(snapshotTimeRangeTracker);
         IOException e = null;
         try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f7d7749..da83e4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -881,6 +881,15 @@ public class HStore implements Store {
     return sf;
   }
 
+  @Override
+  public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
+                                            boolean isCompaction, boolean includeMVCCReadpoint,
+                                            boolean includesTag)
+      throws IOException {
+    return createWriterInTmp(maxKeyCount, compression, isCompaction, includeMVCCReadpoint,
+        includesTag, false);
+  }
+
   /*
    * @param maxKeyCount
    * @param compression Compression algorithm to use
@@ -891,7 +900,8 @@ public class HStore implements Store {
    */
   @Override
   public StoreFile.Writer createWriterInTmp(long maxKeyCount, Compression.Algorithm compression,
-      boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag)
+      boolean isCompaction, boolean includeMVCCReadpoint, boolean includesTag,
+      boolean shouldDropBehind)
   throws IOException {
     final CacheConfig writerCacheConf;
     if (isCompaction) {
@@ -916,6 +926,7 @@ public class HStore implements Store {
             .withMaxKeyCount(maxKeyCount)
             .withFavoredNodes(favoredNodes)
             .withFileContext(hFileContext)
+            .withShouldDropCacheBehind(shouldDropBehind)
             .build();
     return w;
   }
@@ -1014,9 +1025,8 @@ public class HStore implements Store {
     // TODO this used to get the store files in descending order,
     // but now we get them in ascending order, which I think is
     // actually more correct, since memstore get put at the end.
-    List<StoreFileScanner> sfScanners = StoreFileScanner
-      .getScannersForStoreFiles(storeFilesToScan, cacheBlocks, usePread, isCompaction, matcher,
-        readPt);
+    List<StoreFileScanner> sfScanners = StoreFileScanner.getScannersForStoreFiles(storeFilesToScan,
+        cacheBlocks, usePread, isCompaction, false, matcher, readPt);
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
     scanners.addAll(sfScanners);
@@ -1088,7 +1098,7 @@ public class HStore implements Store {
       CompactionThroughputController throughputController) throws IOException {
     assert compaction != null;
     List<StoreFile> sfs = null;
-    CompactionRequest cr = compaction.getRequest();;
+    CompactionRequest cr = compaction.getRequest();
     try {
       // Do all sanity checking in here if we have a valid CompactionRequest
       // because we need to clean up after it on the way out in a finally
@@ -2025,7 +2035,7 @@ public class HStore implements Store {
     return new StoreFlusherImpl(cacheFlushId);
   }
 
-  private class StoreFlusherImpl implements StoreFlushContext {
+  private final class StoreFlusherImpl implements StoreFlushContext {
 
     private long cacheFlushSeqNum;
     private SortedSet<KeyValue> snapshot;

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index c9d101f..ec3d54a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -156,11 +156,28 @@ public interface Store extends HeapSize, StoreConfigInformation {
 
   FileSystem getFileSystem();
 
-  /*
+
+  /**
+   * @param maxKeyCount
+   * @param compression Compression algorithm to use
+   * @param isCompaction whether we are creating a new file in a compaction
+   * @param includeMVCCReadpoint whether we should out the MVCC readpoint
+   * @return Writer for a new StoreFile in the tmp dir.
+   */
+  StoreFile.Writer createWriterInTmp(
+      long maxKeyCount,
+      Compression.Algorithm compression,
+      boolean isCompaction,
+      boolean includeMVCCReadpoint,
+      boolean includesTags
+  ) throws IOException;
+
+  /**
    * @param maxKeyCount
    * @param compression Compression algorithm to use
    * @param isCompaction whether we are creating a new file in a compaction
    * @param includeMVCCReadpoint whether we should out the MVCC readpoint
+   * @param shouldDropBehind should the writer drop caches behind writes
    * @return Writer for a new StoreFile in the tmp dir.
    */
   StoreFile.Writer createWriterInTmp(
@@ -168,9 +185,13 @@ public interface Store extends HeapSize, StoreConfigInformation {
     Compression.Algorithm compression,
     boolean isCompaction,
     boolean includeMVCCReadpoint,
-    boolean includesTags
+    boolean includesTags,
+    boolean shouldDropBehind
   ) throws IOException;
 
+
+
+
   // Compaction oriented methods
 
   boolean throttleCompaction(long compactionSize);

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index a90210a..bdf483d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -253,8 +253,8 @@ public class StoreFile {
   }
 
   /**
-   * @return True if this is a StoreFile Reference; call after {@link #open()}
-   * else may get wrong answer.
+   * @return True if this is a StoreFile Reference; call
+   * after {@link #open(boolean canUseDropBehind)} else may get wrong answer.
    */
   public boolean isReference() {
     return this.fileInfo.isReference();
@@ -367,13 +367,13 @@ public class StoreFile {
    * @throws IOException
    * @see #closeReader(boolean)
    */
-  private Reader open() throws IOException {
+  private Reader open(boolean canUseDropBehind) throws IOException {
     if (this.reader != null) {
       throw new IllegalAccessError("Already open");
     }
 
     // Open the StoreFile.Reader
-    this.reader = fileInfo.open(this.fs, this.cacheConf);
+    this.reader = fileInfo.open(this.fs, this.cacheConf, canUseDropBehind);
 
     // Load up indices and fileinfo. This also loads Bloom filter type.
     metadataMap = Collections.unmodifiableMap(this.reader.loadFileInfo());
@@ -463,14 +463,18 @@ public class StoreFile {
     return this.reader;
   }
 
+  public Reader createReader() throws IOException {
+    return createReader(false);
+  }
+
   /**
    * @return Reader for StoreFile. creates if necessary
    * @throws IOException
    */
-  public Reader createReader() throws IOException {
+  public Reader createReader(boolean canUseDropBehind) throws IOException {
     if (this.reader == null) {
       try {
-        this.reader = open();
+        this.reader = open(canUseDropBehind);
       } catch (IOException e) {
         try {
           this.closeReader(true);
@@ -547,6 +551,8 @@ public class StoreFile {
     private Path filePath;
     private InetSocketAddress[] favoredNodes;
     private HFileContext fileContext;
+    private boolean shouldDropCacheBehind = false;
+
     public WriterBuilder(Configuration conf, CacheConfig cacheConf,
         FileSystem fs) {
       this.conf = conf;
@@ -612,6 +618,11 @@ public class StoreFile {
       this.fileContext = fileContext;
       return this;
     }
+
+    public WriterBuilder withShouldDropCacheBehind(boolean shouldDropCacheBehind) {
+      this.shouldDropCacheBehind = shouldDropCacheBehind;
+      return this;
+    }
     /**
      * Create a store file writer. Client is responsible for closing file when
      * done. If metadata, add BEFORE closing using
@@ -1262,6 +1273,7 @@ public class StoreFile {
         case ROWCOL:
           key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
               colOffset, colLen);
+
           break;
 
         default:

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
index e4d0e83..5e46d94 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.util.FSUtils;
 
@@ -175,21 +174,24 @@ public class StoreFileInfo {
    * @return The StoreFile.Reader for the file
    */
   public StoreFile.Reader open(final FileSystem fs,
-      final CacheConfig cacheConf) throws IOException {
+      final CacheConfig cacheConf, final boolean canUseDropBehind) throws IOException {
     FSDataInputStreamWrapper in;
     FileStatus status;
 
+    final boolean doDropBehind = canUseDropBehind && cacheConf.shouldDropBehindCompaction();
     if (this.link != null) {
       // HFileLink
-      in = new FSDataInputStreamWrapper(fs, this.link);
+      in = new FSDataInputStreamWrapper(fs, this.link, doDropBehind);
       status = this.link.getFileStatus(fs);
     } else if (this.reference != null) {
       // HFile Reference
       Path referencePath = getReferredToFile(this.getPath());
-      in = new FSDataInputStreamWrapper(fs, referencePath);
+      in = new FSDataInputStreamWrapper(fs, referencePath,
+          doDropBehind);
       status = fs.getFileStatus(referencePath);
     } else {
-      in = new FSDataInputStreamWrapper(fs, this.getPath());
+      in = new FSDataInputStreamWrapper(fs, this.getPath(),
+          doDropBehind);
       status = fileStatus;
     }
     long length = status.getLen();

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index e4116fd..d9e2eab 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -87,7 +87,7 @@ public class StoreFileScanner implements KeyValueScanner {
       boolean cacheBlocks,
       boolean usePread, long readPt) throws IOException {
     return getScannersForStoreFiles(files, cacheBlocks,
-                                   usePread, false, readPt);
+                                   usePread, false, false, readPt);
   }
 
   /**
@@ -97,7 +97,17 @@ public class StoreFileScanner implements KeyValueScanner {
       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
       boolean isCompaction, long readPt) throws IOException {
     return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
-        null, readPt);
+        false, null, readPt);
+  }
+
+  /**
+   * Return an array of scanners corresponding to the given set of store files.
+   */
+  public static List<StoreFileScanner> getScannersForStoreFiles(
+      Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
+      boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
+    return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
+        useDropBehind, null, readPt);
   }
 
   /**
@@ -108,10 +118,23 @@ public class StoreFileScanner implements KeyValueScanner {
   public static List<StoreFileScanner> getScannersForStoreFiles(
       Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
       boolean isCompaction, ScanQueryMatcher matcher, long readPt) throws IOException {
+    return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, false,
+      matcher, readPt);
+  }
+
+  /**
+   * Return an array of scanners corresponding to the given set of store files,
+   * And set the ScanQueryMatcher for each store file scanner for further
+   * optimization
+   */
+  public static List<StoreFileScanner> getScannersForStoreFiles(
+      Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
+      boolean isCompaction, boolean canUseDrop,
+      ScanQueryMatcher matcher, long readPt) throws IOException {
     List<StoreFileScanner> scanners = new ArrayList<StoreFileScanner>(
         files.size());
     for (StoreFile file : files) {
-      StoreFile.Reader r = file.createReader();
+      StoreFile.Reader r = file.createReader(canUseDrop);
       StoreFileScanner scanner = r.getStoreFileScanner(cacheBlocks, usePread,
           isCompaction, readPt);
       scanner.setScanQueryMatcher(matcher);

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index b03c8aa..a52b1bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -115,7 +115,11 @@ public class StripeStoreFlusher extends StoreFlusher {
       @Override
       public Writer createWriter() throws IOException {
         StoreFile.Writer writer = store.createWriterInTmp(
-            kvCount, store.getFamily().getCompression(), false, true, true);
+            kvCount, store.getFamily().getCompression(),
+            /* isCompaction = */ false,
+            /* includeMVCCReadpoint = */ true,
+            /* includesTags = */ true,
+            /* shouldDropBehind = */ false);
         writer.setTimeRangeTracker(tracker);
         return writer;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index c4be217..42e4953 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -163,8 +163,14 @@ public abstract class Compactor {
    * @return Scanners.
    */
   protected List<StoreFileScanner> createFileScanners(
-      final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException
{
-    return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
+      final Collection<StoreFile> filesToCompact,
+      long smallestReadPoint,
+      boolean useDropBehind) throws IOException {
+    return StoreFileScanner.getScannersForStoreFiles(filesToCompact,
+        /* cache blocks = */ false,
+        /* use pread = */ false,
+        /* is compaction */ true,
+        /* use Drop Behind */ useDropBehind,
       smallestReadPoint);
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index b505efc..dfbd3f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -60,17 +60,19 @@ public class DefaultCompactor extends Compactor {
 
     List<StoreFileScanner> scanners;
     Collection<StoreFile> readersToClose;
-    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", false)) {
+    if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
       // clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
       // HFileFiles, and their readers
       readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
       for (StoreFile f : request.getFiles()) {
         readersToClose.add(new StoreFile(f));
       }
-      scanners = createFileScanners(readersToClose, smallestReadPoint);
+      scanners = createFileScanners(readersToClose, smallestReadPoint,
+          store.throttleCompaction(request.getSize()));
     } else {
       readersToClose = Collections.emptyList();
-      scanners = createFileScanners(request.getFiles(), smallestReadPoint);
+      scanners = createFileScanners(request.getFiles(), smallestReadPoint,
+          store.throttleCompaction(request.getSize()));
     }
 
     StoreFile.Writer writer = null;
@@ -94,14 +96,16 @@ public class DefaultCompactor extends Compactor {
         // Create the writer even if no kv(Empty store file is also ok),
         // because we need record the max seq id for the store file, see HBASE-6059
         writer = store.createWriterInTmp(fd.maxKeyCount, this.compactionCompression, true,
-            fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
+          fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
+
         boolean finished =
             performCompaction(scanner, writer, smallestReadPoint, throughputController);
+
         if (!finished) {
           writer.close();
           store.getFileSystem().delete(writer.getPath(), false);
           writer = null;
-          throw new InterruptedIOException( "Aborting compaction of store " + store +
+          throw new InterruptedIOException("Aborting compaction of store " + store +
               " in region " + store.getRegionInfo().getRegionNameAsString() +
               " because it was interrupted.");
          }

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 7a5cbb4..342ecce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -81,7 +81,7 @@ public class StripeCompactor extends Compactor {
       throughputController);
   }
 
-  private List<Path> compactInternal(StripeMultiFileWriter mw, CompactionRequest request,
+  private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest
request,
       byte[] majorRangeFromRow, byte[] majorRangeToRow,
       CompactionThroughputController throughputController) throws IOException {
     final Collection<StoreFile> filesToCompact = request.getFiles();
@@ -89,7 +89,8 @@ public class StripeCompactor extends Compactor {
     this.progress = new CompactionProgress(fd.maxKeyCount);
 
     long smallestReadPoint = getSmallestReadPoint();
-    List<StoreFileScanner> scanners = createFileScanners(filesToCompact, smallestReadPoint);
+    List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
+        smallestReadPoint, store.throttleCompaction(request.getSize()));
 
     boolean finished = false;
     InternalScanner scanner = null;
@@ -117,7 +118,8 @@ public class StripeCompactor extends Compactor {
         @Override
         public Writer createWriter() throws IOException {
           return store.createWriterInTmp(
-              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
+              fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
+              store.throttleCompaction(request.getSize()));
         }
       };
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
index 3952fa4..d7f18dd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestCacheOnWrite.java
@@ -245,7 +245,7 @@ public class TestCacheOnWrite {
     cacheConf =
         new CacheConfig(blockCache, true, true, cowType.shouldBeCached(BlockType.DATA),
         cowType.shouldBeCached(BlockType.LEAF_INDEX),
-        cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false);
+        cowType.shouldBeCached(BlockType.BLOOM_CHUNK), false, cacheCompressedData, false,
false);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
index 6d7d675..94a5ba7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFSErrorsExposed.java
@@ -142,7 +142,7 @@ public class TestFSErrorsExposed {
       cacheConf, BloomType.NONE);
 
     List<StoreFileScanner> scanners = StoreFileScanner.getScannersForStoreFiles(
-        Collections.singletonList(sf), false, true, false,
+        Collections.singletonList(sf), false, true, false, false,
         // 0 is passed as readpoint because this test operates on StoreFile directly
         0);
     KeyValueScanner scanner = scanners.get(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
index 14229de..b7bc6f2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java
@@ -104,14 +104,14 @@ public class TestReversibleScanners {
           TEST_UTIL.getConfiguration(), cacheConf, BloomType.NONE);
 
       List<StoreFileScanner> scanners = StoreFileScanner
-          .getScannersForStoreFiles(Collections.singletonList(sf), false, true,
-              false, Long.MAX_VALUE);
+          .getScannersForStoreFiles(Collections.singletonList(sf),
+              false, true, false, false, Long.MAX_VALUE);
       StoreFileScanner scanner = scanners.get(0);
       seekTestOfReversibleKeyValueScanner(scanner);
       for (int readPoint = 0; readPoint < MAXMVCC; readPoint++) {
         LOG.info("Setting read point to " + readPoint);
         scanners = StoreFileScanner.getScannersForStoreFiles(
-            Collections.singletonList(sf), false, true, false, readPoint);
+            Collections.singletonList(sf), false, true, false, false, readPoint);
         seekTestOfReversibleKeyValueScannerWithMVCC(scanners.get(0), readPoint);
       }
     }
@@ -493,7 +493,7 @@ public class TestReversibleScanners {
       throws IOException {
     List<StoreFileScanner> fileScanners = StoreFileScanner
         .getScannersForStoreFiles(Lists.newArrayList(sf1, sf2), false, true,
-            false, readPoint);
+            false, false, readPoint);
     List<KeyValueScanner> memScanners = memstore.getScanners(readPoint);
     List<KeyValueScanner> scanners = new ArrayList<KeyValueScanner>(
         fileScanners.size() + 1);

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
index 5471ee5..fb123d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
@@ -186,7 +186,7 @@ public class TestStripeCompactor {
     when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
     when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
     when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
-        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
+        anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
     when(store.getComparator()).thenReturn(new KVComparator());
 
     return new StripeCompactor(conf, store) {
@@ -217,6 +217,7 @@ public class TestStripeCompactor {
       .thenReturn(mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
     when(sf.createReader()).thenReturn(r);
+    when(sf.createReader(anyBoolean())).thenReturn(r);
     return new CompactionRequest(Arrays.asList(sf));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/672d74e7/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index 8c5c67a..33c28a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -723,6 +723,7 @@ public class TestStripeCompactionPolicy {
     when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong())).thenReturn(
       mock(StoreFileScanner.class));
     when(sf.getReader()).thenReturn(r);
+    when(sf.createReader(anyBoolean())).thenReturn(r);
     when(sf.createReader()).thenReturn(r);
     return sf;
   }
@@ -746,7 +747,7 @@ public class TestStripeCompactionPolicy {
     when(store.getRegionInfo()).thenReturn(info);
     when(
       store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
-        anyBoolean(), anyBoolean())).thenAnswer(writers);
+        anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
 
     Configuration conf = HBaseConfiguration.create();
     final Scanner scanner = new Scanner();


Mime
View raw message