hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-19033 Allow CP users to change versions and TTL before opening StoreScanner
Date Sun, 29 Oct 2017 14:02:19 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-2 8fb22d9c1 -> 15388d4e1


HBASE-19033 Allow CP users to change versions and TTL before opening StoreScanner


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/15388d4e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/15388d4e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/15388d4e

Branch: refs/heads/branch-2
Commit: 15388d4e169f28eed432b94bf9228e72c6bf920d
Parents: 8fb22d9
Author: zhangduo <zhangduo@apache.org>
Authored: Sun Oct 29 21:18:55 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Sun Oct 29 21:53:49 2017 +0800

----------------------------------------------------------------------
 .../example/WriteHeavyIncrementObserver.java    | 252 +++++++++++++++++++
 .../example/ZooKeeperScanPolicyObserver.java    |  37 +--
 .../TestWriteHeavyIncrementObserver.java        | 153 +++++++++++
 .../hbase/coprocessor/RegionObserver.java       |  48 +++-
 .../hbase/mob/DefaultMobStoreCompactor.java     |   8 +-
 .../regionserver/CustomizedScanInfoBuilder.java |  69 +++++
 .../hadoop/hbase/regionserver/HMobStore.java    |   9 +-
 .../hadoop/hbase/regionserver/HRegion.java      |   1 +
 .../hadoop/hbase/regionserver/HStore.java       |  23 +-
 .../MemStoreCompactorSegmentsIterator.java      |   9 +-
 .../regionserver/RegionCoprocessorHost.java     |  48 +++-
 .../hadoop/hbase/regionserver/ScanInfo.java     |  62 +++--
 .../hadoop/hbase/regionserver/ScanOptions.java  |  62 +++++
 .../hadoop/hbase/regionserver/StoreFlusher.java |  11 +-
 .../hadoop/hbase/regionserver/StoreScanner.java |  69 +++--
 .../regionserver/compactions/Compactor.java     |  80 +++---
 .../compactions/StripeCompactor.java            |  13 +-
 .../client/TestFromClientSideScanExcpetion.java |   9 +-
 .../hbase/regionserver/TestCompaction.java      |   6 +-
 .../TestDefaultCompactSelection.java            |   5 +-
 .../hadoop/hbase/regionserver/TestHRegion.java  |   2 +
 .../hbase/regionserver/TestMajorCompaction.java |   4 +-
 .../compactions/TestDateTieredCompactor.java    |  11 +-
 .../compactions/TestStripeCompactionPolicy.java |  12 +-
 .../compactions/TestStripeCompactor.java        |  11 +-
 25 files changed, 828 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
new file mode 100644
index 0000000..e9b590d
--- /dev/null
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/WriteHeavyIncrementObserver.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import java.io.IOException;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.mutable.MutableLong;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
+import org.apache.hadoop.hbase.regionserver.ScanType;
+import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import org.apache.hadoop.hbase.shaded.com.google.common.math.IntMath;
+
+/**
+ * An example for implementing a counter that reads is much less than writes, i.e, write heavy.
+ * <p>
+ * We will convert increment to put, and do aggregating when get. And of course the return value of
+ * increment is useless then.
+ * <p>
+ * Notice that this is only an example so we do not handle most corner cases, for example, you must
+ * provide a qualifier when doing a get.
+ */
+public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObserver {
+
+  @Override
+  public Optional<RegionObserver> getRegionObserver() {
+    return Optional.of(this);
+  }
+
+  @Override
+  public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
+    options.readAllVersions();
+  }
+
+  private Cell createCell(byte[] row, byte[] family, byte[] qualifier, long ts, long value) {
+    return CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
+        .setType(CellBuilder.DataType.Put).setFamily(family).setQualifier(qualifier)
+        .setTimestamp(ts).setValue(Bytes.toBytes(value)).build();
+  }
+
+  private InternalScanner wrap(byte[] family, InternalScanner scanner) {
+    return new InternalScanner() {
+
+      private List<Cell> srcResult = new ArrayList<>();
+
+      private byte[] row;
+
+      private byte[] qualifier;
+
+      private long timestamp;
+
+      private long sum;
+
+      @Override
+      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
+        boolean moreRows = scanner.next(srcResult, scannerContext);
+        if (srcResult.isEmpty()) {
+          if (!moreRows && row != null) {
+            result.add(createCell(row, family, qualifier, timestamp, sum));
+          }
+          return moreRows;
+        }
+        Cell firstCell = srcResult.get(0);
+        // Check if there is a row change first. All the cells will come from the same row so just
+        // check the first one once is enough.
+        if (row == null) {
+          row = CellUtil.cloneRow(firstCell);
+          qualifier = CellUtil.cloneQualifier(firstCell);
+        } else if (!CellUtil.matchingRows(firstCell, row)) {
+          result.add(createCell(row, family, qualifier, timestamp, sum));
+          row = CellUtil.cloneRow(firstCell);
+          qualifier = CellUtil.cloneQualifier(firstCell);
+          sum = 0;
+        }
+        srcResult.forEach(c -> {
+          if (CellUtil.matchingQualifier(c, qualifier)) {
+            sum += Bytes.toLong(c.getValueArray(), c.getValueOffset());
+          } else {
+            result.add(createCell(row, family, qualifier, timestamp, sum));
+            qualifier = CellUtil.cloneQualifier(c);
+            sum = Bytes.toLong(c.getValueArray(), c.getValueOffset());
+          }
+          timestamp = c.getTimestamp();
+        });
+        if (!moreRows) {
+          result.add(createCell(row, family, qualifier, timestamp, sum));
+        }
+        srcResult.clear();
+        return moreRows;
+      }
+
+      @Override
+      public void close() throws IOException {
+        scanner.close();
+      }
+    };
+  }
+
+  @Override
+  public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
+    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
+  }
+
+  @Override
+  public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
+      CompactionRequest request) throws IOException {
+    options.readAllVersions();
+  }
+
+  @Override
+  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+      CompactionRequest request) throws IOException {
+    return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
+  }
+
+  @Override
+  public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
+      throws IOException {
+    Scan scan =
+        new Scan().withStartRow(get.getRow()).withStopRow(get.getRow(), true).readAllVersions();
+    NavigableMap<byte[], NavigableMap<byte[], MutableLong>> sums =
+        new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    get.getFamilyMap().forEach((cf, cqs) -> {
+      NavigableMap<byte[], MutableLong> ss = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      sums.put(cf, ss);
+      cqs.forEach(cq -> {
+        ss.put(cq, new MutableLong(0));
+        scan.addColumn(cf, cq);
+      });
+    });
+    List<Cell> cells = new ArrayList<>();
+    try (RegionScanner scanner = c.getEnvironment().getRegion().getScanner(scan)) {
+      boolean moreRows;
+      do {
+        moreRows = scanner.next(cells);
+        for (Cell cell : cells) {
+          byte[] family = CellUtil.cloneFamily(cell);
+          byte[] qualifier = CellUtil.cloneQualifier(cell);
+          long value = Bytes.toLong(cell.getValueArray(), cell.getValueOffset());
+          sums.get(family).get(qualifier).add(value);
+        }
+        cells.clear();
+      } while (moreRows);
+    }
+    sums.forEach((cf, m) -> m.forEach((cq, s) -> result
+        .add(createCell(get.getRow(), cf, cq, HConstants.LATEST_TIMESTAMP, s.longValue()))));
+    c.bypass();
+  }
+
+  private final int mask;
+  private final MutableLong[] lastTimestamps;
+  {
+    int stripes =
+        1 << IntMath.log2(Runtime.getRuntime().availableProcessors(), RoundingMode.CEILING);
+    lastTimestamps =
+        IntStream.range(0, stripes).mapToObj(i -> new MutableLong()).toArray(MutableLong[]::new);
+    mask = stripes - 1;
+  }
+
+  // We need make sure the different put uses different timestamp otherwise we may lost some
+  // increments. This is a known issue for HBase.
+  private long getUniqueTimestamp(byte[] row) {
+    int slot = Bytes.hashCode(row) & mask;
+    MutableLong lastTimestamp = lastTimestamps[slot];
+    long now = System.currentTimeMillis();
+    synchronized (lastTimestamp) {
+      long pt = lastTimestamp.longValue() >> 10;
+      if (now > pt) {
+        lastTimestamp.setValue(now << 10);
+      } else {
+        lastTimestamp.increment();
+      }
+      return lastTimestamp.longValue();
+    }
+  }
+
+  @Override
+  public Result preIncrement(ObserverContext<RegionCoprocessorEnvironment> c, Increment increment)
+      throws IOException {
+    byte[] row = increment.getRow();
+    Put put = new Put(row);
+    long ts = getUniqueTimestamp(row);
+    for (Map.Entry<byte[], List<Cell>> entry : increment.getFamilyCellMap().entrySet()) {
+      for (Cell cell : entry.getValue()) {
+        put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY).setRow(row)
+            .setFamily(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength())
+            .setQualifier(cell.getQualifierArray(), cell.getQualifierOffset(),
+              cell.getQualifierLength())
+            .setValue(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())
+            .setType(CellBuilder.DataType.Put).setTimestamp(ts).build());
+      }
+    }
+    c.getEnvironment().getRegion().put(put);
+    c.bypass();
+    return Result.EMPTY_RESULT;
+  }
+
+  @Override
+  public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store store,
+      ScanOptions options) throws IOException {
+    options.readAllVersions();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
index d6d66bb..449726f 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.coprocessor.example;
 
 import java.io.IOException;
-import java.util.List;
 import java.util.Optional;
 import java.util.OptionalLong;
 
@@ -28,20 +27,19 @@ import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.recipes.cache.ChildData;
 import org.apache.curator.framework.recipes.cache.NodeCache;
 import org.apache.curator.retry.RetryForever;
-import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
 import org.apache.hadoop.hbase.regionserver.ScanType;
-import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * This is an example showing how a RegionObserver could configured via ZooKeeper in order to
@@ -170,33 +168,24 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs
     return OptionalLong.of(Bytes.toLong(bytes));
   }
 
-  private InternalScanner wrap(InternalScanner scanner) {
-    OptionalLong optExpireBefore = getExpireBefore();
-    if (!optExpireBefore.isPresent()) {
-      return scanner;
+  private void resetTTL(ScanOptions options) {
+    OptionalLong expireBefore = getExpireBefore();
+    if (!expireBefore.isPresent()) {
+      return;
     }
-    long expireBefore = optExpireBefore.getAsLong();
-    return new DelegatingInternalScanner(scanner) {
-
-      @Override
-      public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
-        boolean moreRows = scanner.next(result, scannerContext);
-        result.removeIf(c -> c.getTimestamp() < expireBefore);
-        return moreRows;
-      }
-    };
+    options.setTTL(EnvironmentEdgeManager.currentTime() - expireBefore.getAsLong());
   }
 
   @Override
-  public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
-    return wrap(scanner);
+  public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      ScanOptions options, FlushLifeCycleTracker tracker) throws IOException {
+    resetTTL(options);
   }
 
   @Override
-  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker,
+  public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
       CompactionRequest request) throws IOException {
-    return wrap(scanner);
+    resetTTL(options);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
new file mode 100644
index 0000000..1881c85
--- /dev/null
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestWriteHeavyIncrementObserver.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.coprocessor.example;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ CoprocessorTests.class, MediumTests.class })
+public class TestWriteHeavyIncrementObserver {
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName NAME = TableName.valueOf("TestCP");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[] ROW = Bytes.toBytes("row");
+
+  private static byte[] CQ1 = Bytes.toBytes("cq1");
+
+  private static byte[] CQ2 = Bytes.toBytes("cq2");
+
+  private static Table TABLE;
+
+  private static long UPPER = 1000;
+
+  private static int THREADS = 10;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
+    UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
+    UTIL.startMiniCluster(3);
+    UTIL.getAdmin()
+        .createTable(TableDescriptorBuilder.newBuilder(NAME)
+            .addCoprocessor(WriteHeavyIncrementObserver.class.getName())
+            .addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
+    TABLE = UTIL.getConnection().getTable(NAME);
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    if (TABLE != null) {
+      TABLE.close();
+    }
+    UTIL.shutdownMiniCluster();
+  }
+
+  private static void increment() throws IOException {
+    for (long i = 1; i <= UPPER; i++) {
+      TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
+      try {
+        Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10));
+      } catch (InterruptedException e) {
+      }
+    }
+  }
+
+  private void assertSum() throws IOException {
+    Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
+    assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
+    assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
+  }
+
+  @Test
+  public void test() throws Exception {
+    Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
+      try {
+        increment();
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }, "increment-" + i)).toArray(Thread[]::new);
+    for (Thread thread : threads) {
+      thread.start();
+    }
+    for (Thread thread : threads) {
+      thread.join();
+    }
+    assertSum();
+    // we do not hack scan operation so using scan we could get the original values added into the
+    // table.
+    try (ResultScanner scanner = TABLE.getScanner(new Scan().withStartRow(ROW)
+        .withStopRow(ROW, true).addFamily(FAMILY).readAllVersions().setAllowPartialResults(true))) {
+      Result r = scanner.next();
+      assertTrue(r.rawCells().length > 2);
+    }
+    UTIL.getAdmin().flush(NAME);
+    UTIL.getAdmin().majorCompact(NAME);
+    HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY);
+    Waiter.waitFor(UTIL.getConfiguration(), 30000, new Waiter.ExplainingPredicate<Exception>() {
+
+      @Override
+      public boolean evaluate() throws Exception {
+        return store.getStorefilesCount() == 1;
+      }
+
+      @Override
+      public String explainFailure() throws Exception {
+        return "Major compaction hangs, there are still " + store.getStorefilesCount() +
+            " store files";
+      }
+    });
+    assertSum();
+    // Should only have two cells after flush and major compaction
+    try (ResultScanner scanner = TABLE.getScanner(new Scan().withStartRow(ROW)
+        .withStopRow(ROW, true).addFamily(FAMILY).readAllVersions().setAllowPartialResults(true))) {
+      Result r = scanner.next();
+      assertEquals(2, r.rawCells().length);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index cfc8e92..1fdd2f3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.regionserver.OperationStatus;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.Region.Operation;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.regionserver.ScanOptions;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
@@ -130,9 +131,19 @@ public interface RegionObserver {
       FlushLifeCycleTracker tracker) throws IOException {}
 
   /**
+   * Called before we open store scanner for flush. You can use the {@code options} to change max
+   * versions and TTL for the scanner being opened.
+   * @param c the environment provided by the region server
+   * @param store the store where flush is being requested
+   * @param options used to change max versions and TTL for the scanner being opened
+   */
+  default void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      ScanOptions options,FlushLifeCycleTracker tracker) throws IOException {}
+
+  /**
    * Called before a Store's memstore is flushed to disk.
    * @param c the environment provided by the region server
-   * @param store the store where compaction is being requested
+   * @param store the store where flush is being requested
    * @param scanner the scanner over existing data used in the store file
    * @param tracker tracker used to track the life cycle of a flush
    * @return the scanner to use during compaction.  Should not be {@code null}
@@ -189,6 +200,20 @@ public interface RegionObserver {
       CompactionRequest request) {}
 
   /**
+   * Called before we open store scanner for compaction. You can use the {@code options} to change max
+   * versions and TTL for the scanner being opened.
+   * @param c the environment provided by the region server
+   * @param store the store being compacted
+   * @param scanType type of Scan
+   * @param options used to change max versions and TTL for the scanner being opened
+   * @param tracker tracker used to track the life cycle of a compaction
+   * @param request the requested compaction
+   */
+  default void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
+      ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
+      CompactionRequest request) throws IOException {}
+
+  /**
    * Called prior to writing the {@link StoreFile}s selected for compaction into a new
    * {@code StoreFile}.
    * <p>
@@ -858,6 +883,27 @@ public interface RegionObserver {
       InternalScanner s) throws IOException {}
 
   /**
+   * Called before a store opens a new scanner.
+   * <p>
+   * This hook is called when a "user" scanner is opened. Use {@code preFlushScannerOpen} and
+   * {@code preCompactScannerOpen} to inject flush/compaction.
+   * <p>
+   * Notice that, this method is used to change the inherent max versions and TTL for a Store. For
+   * example, you can change the max versions option for a {@link Scan} object to 10 in
+   * {@code preScannerOpen}, but if the max versions config on the Store is 1, then you still can
+   * only read 1 version. You need also to inject here to change the max versions to 10 if you want
+   * to get more versions.
+   * @param ctx the environment provided by the region server
+   * @param store the store which we want to get scanner from
+   * @param options used to change max versions and TTL for the scanner being opened
+   * @see #preFlushScannerOpen(ObserverContext, Store, ScanOptions, FlushLifeCycleTracker)
+   * @see #preCompactScannerOpen(ObserverContext, Store, ScanType, ScanOptions,
+   *      CompactionLifeCycleTracker, CompactionRequest)
+   */
+  default void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx, Store store,
+      ScanOptions options) throws IOException {}
+
+  /**
    * Called before replaying WALs for this region.
    * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
    * effect in this hook.

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index 32552da..447629b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -22,7 +22,6 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.OptionalInt;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.HMobStore;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ShipperListener;
@@ -72,10 +72,10 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
     }
 
     @Override
-    public InternalScanner createScanner(List<StoreFileScanner> scanners,
+    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
         ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
-      return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType,
-          smallestReadPoint, fd.earliestPutTs);
+      return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
+          fd.earliestPutTs);
     }
   };
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
new file mode 100644
index 0000000..c3d5e57
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CustomizedScanInfoBuilder.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Helper class for CP hooks to change max versions and TTL.
+ */
+@InterfaceAudience.Private
+public class CustomizedScanInfoBuilder implements ScanOptions {
+
+  private final ScanInfo scanInfo;
+
+  private Integer maxVersions;
+
+  private Long ttl;
+
+  public CustomizedScanInfoBuilder(ScanInfo scanInfo) {
+    this.scanInfo = scanInfo;
+  }
+
+  @Override
+  public int getMaxVersions() {
+    return maxVersions != null ? maxVersions.intValue() : scanInfo.getMaxVersions();
+  }
+
+  @Override
+  public void setMaxVersions(int maxVersions) {
+    this.maxVersions = maxVersions;
+  }
+
+  @Override
+  public long getTTL() {
+    return ttl != null ? ttl.longValue() : scanInfo.getTtl();
+  }
+
+  @Override
+  public void setTTL(long ttl) {
+    this.ttl = ttl;
+  }
+
+  public ScanInfo build() {
+    if (maxVersions == null && ttl == null) {
+      return scanInfo;
+    }
+    return scanInfo.customize(getMaxVersions(), getTTL());
+  }
+
+  @Override
+  public String toString() {
+    return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() + "]";
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 206c3cd..5cb1e45 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -144,8 +144,8 @@ public class HMobStore extends HStore {
    * the mob files should be performed after the seek in HBase is done.
    */
   @Override
-  protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
-      long readPt) throws IOException {
+  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
+      NavigableSet<byte[]> targetCols, long readPt) throws IOException {
     if (MobUtils.isRefOnlyScan(scan)) {
       Filter refOnlyFilter = new MobReferenceOnlyFilter();
       Filter filter = scan.getFilter();
@@ -155,9 +155,8 @@ public class HMobStore extends HStore {
         scan.setFilter(refOnlyFilter);
       }
     }
-    return scan.isReversed()
-        ? new ReversedMobStoreScanner(this, getScanInfo(), scan, targetCols, readPt)
-        : new MobStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+    return scan.isReversed() ? new ReversedMobStoreScanner(this, scanInfo, scan, targetCols, readPt)
+        : new MobStoreScanner(this, scanInfo, scan, targetCols, readPt);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 6f71dc9..648a415 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -7853,6 +7853,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   }
 
   /** @param coprocessorHost the new coprocessor host */
+  @VisibleForTesting
   public void setCoprocessorHost(final RegionCoprocessorHost coprocessorHost) {
     this.coprocessorHost = coprocessorHost;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index c0cea4e..7b8ca79 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1909,7 +1909,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
     this.forceMajor = true;
   }
 
-
   //////////////////////////////////////////////////////////////////////////////
   // File administration
   //////////////////////////////////////////////////////////////////////////////
@@ -1922,21 +1921,27 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
    * @return a scanner over the current key values
    * @throws IOException on failure
    */
-  public KeyValueScanner getScanner(Scan scan,
-      final NavigableSet<byte []> targetCols, long readPt) throws IOException {
+  public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols, long readPt)
+      throws IOException {
     lock.readLock().lock();
     try {
-      return createScanner(scan, targetCols, readPt);
+      ScanInfo scanInfo;
+      if (this.getCoprocessorHost() != null) {
+        scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this);
+      } else {
+        scanInfo = getScanInfo();
+      }
+      return createScanner(scan, scanInfo, targetCols, readPt);
     } finally {
       lock.readLock().unlock();
     }
   }
 
-  protected KeyValueScanner createScanner(Scan scan, final NavigableSet<byte[]> targetCols,
-      long readPt) throws IOException {
-    return scan.isReversed() ? new ReversedStoreScanner(this,
-      getScanInfo(), scan, targetCols, readPt) : new StoreScanner(this,
-      getScanInfo(), scan, targetCols, readPt);
+  // HMobStore will override this method to return its own implementation.
+  protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
+      NavigableSet<byte[]> targetCols, long readPt) throws IOException {
+    return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
+        : new StoreScanner(this, scanInfo, scan, targetCols, readPt);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
index b3ba998..7ab2fe3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreCompactorSegmentsIterator.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.util.OptionalInt;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
@@ -108,9 +107,11 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
    */
   private StoreScanner createScanner(HStore store, List<KeyValueScanner> scanners)
       throws IOException {
-    // Get all available versions
-    return new StoreScanner(store, store.getScanInfo(), OptionalInt.of(Integer.MAX_VALUE), scanners,
-        ScanType.COMPACT_RETAIN_DELETES, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
+    // FIXME: This is the old comment 'Get all available versions'
+    // But actually if we really reset the ScanInfo to get all available versions then lots of UTs
+    // will fail
+    return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES,
+        store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
   }
 
   /* Refill kev-value set (should be invoked only when KVS is empty)

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
index c242fd1..c5a3de3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
@@ -1,5 +1,4 @@
-/*
- *
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -623,6 +622,21 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * Called prior to opening store scanner for compaction.
+   */
+  public ScanInfo preCompactScannerOpen(HStore store, ScanType scanType,
+      CompactionLifeCycleTracker tracker, CompactionRequest request, User user) throws IOException {
+    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation(user) {
+      @Override
+      public void call(RegionObserver observer) throws IOException {
+        observer.preCompactScannerOpen(this, store, scanType, builder, tracker, request);
+      }
+    });
+    return builder.build();
+  }
+
+  /**
    * Called prior to rewriting the store files selected for compaction
    * @param store the store being compacted
    * @param scanner the scanner used to read store data during compaction
@@ -666,6 +680,22 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * Invoked before create StoreScanner for flush.
+   * @throws IOException
+   */
+  public ScanInfo preFlushScannerOpen(HStore store, FlushLifeCycleTracker tracker)
+      throws IOException {
+    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
+      @Override
+      public void call(RegionObserver observer) throws IOException {
+        observer.preFlushScannerOpen(this, store, builder, tracker);
+      }
+    });
+    return builder.build();
+  }
+
+  /**
    * Invoked before a memstore flush
    * @throws IOException
    */
@@ -1264,6 +1294,20 @@ public class RegionCoprocessorHost
   }
 
   /**
+   * Called before open store scanner for user scan.
+   */
+  public ScanInfo preStoreScannerOpen(HStore store) throws IOException {
+    CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
+    execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
+      @Override
+      public void call(RegionObserver observer) throws IOException {
+        observer.preStoreScannerOpen(this, store, builder);
+      }
+    });
+    return builder.build();
+  }
+
+  /**
    * @param info the RegionInfo for this region
    * @param edits the file of recovered edits
    * @throws IOException Exception

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
index 8e48c69..4e5cb70 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java
@@ -49,7 +49,6 @@ public class ScanInfo {
   private long cellsPerTimeoutCheck;
   private boolean parallelSeekEnabled;
   private final long preadMaxBytes;
-  private final Configuration conf;
   private final boolean newVersionBehavior;
 
   public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
@@ -64,10 +63,18 @@ public class ScanInfo {
    *          major compaction.
    * @param comparator The store's comparator
    */
-  public ScanInfo(final Configuration conf, final ColumnFamilyDescriptor family, final long ttl,
-      final long timeToPurgeDeletes, final CellComparator comparator) {
+  public ScanInfo(Configuration conf, ColumnFamilyDescriptor family, long ttl,
+      long timeToPurgeDeletes, CellComparator comparator) {
     this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl,
-        family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator, family.isNewVersionBehavior());
+        family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator,
+        family.isNewVersionBehavior());
+  }
+
+  private static long getCellsPerTimeoutCheck(Configuration conf) {
+    long perHeartbeat = conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
+      StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
+    return perHeartbeat > 0 ? perHeartbeat
+        : StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
   }
 
   /**
@@ -82,10 +89,20 @@ public class ScanInfo {
    * @param keepDeletedCells Store's keepDeletedCells setting
    * @param comparator The store's comparator
    */
-  public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
-      final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
-      final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator,
-      final boolean newVersionBehavior) {
+  public ScanInfo(Configuration conf, byte[] family, int minVersions, int maxVersions, long ttl,
+      KeepDeletedCells keepDeletedCells, long blockSize, long timeToPurgeDeletes,
+      CellComparator comparator, boolean newVersionBehavior) {
+    this(family, minVersions, maxVersions, ttl, keepDeletedCells, timeToPurgeDeletes, comparator,
+        conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT),
+        conf.getBoolean("hbase.storescanner.use.pread", false), getCellsPerTimeoutCheck(conf),
+        conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false),
+        conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize), newVersionBehavior);
+  }
+
+  private ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
+      KeepDeletedCells keepDeletedCells, long timeToPurgeDeletes, CellComparator comparator,
+      long tableMaxRowSize, boolean usePread, long cellsPerTimeoutCheck,
+      boolean parallelSeekEnabled, long preadMaxBytes, boolean newVersionBehavior) {
     this.family = family;
     this.minVersions = minVersions;
     this.maxVersions = maxVersions;
@@ -93,25 +110,14 @@ public class ScanInfo {
     this.keepDeletedCells = keepDeletedCells;
     this.timeToPurgeDeletes = timeToPurgeDeletes;
     this.comparator = comparator;
-    this.tableMaxRowSize =
-      conf.getLong(HConstants.TABLE_MAX_ROWSIZE_KEY, HConstants.TABLE_MAX_ROWSIZE_DEFAULT);
-    this.usePread = conf.getBoolean("hbase.storescanner.use.pread", false);
-    long perHeartbeat =
-      conf.getLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK,
-        StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK);
-    this.cellsPerTimeoutCheck = perHeartbeat > 0?
-        perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
-    this.parallelSeekEnabled =
-      conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
-    this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize);
-    this.conf = conf;
+    this.tableMaxRowSize = tableMaxRowSize;
+    this.usePread = usePread;
+    this.cellsPerTimeoutCheck = cellsPerTimeoutCheck;
+    this.parallelSeekEnabled = parallelSeekEnabled;
+    this.preadMaxBytes = preadMaxBytes;
     this.newVersionBehavior = newVersionBehavior;
   }
 
-  public Configuration getConfiguration() {
-    return this.conf;
-  }
-
   long getTableMaxRowSize() {
     return this.tableMaxRowSize;
   }
@@ -163,4 +169,12 @@ public class ScanInfo {
   public boolean isNewVersionBehavior() {
     return newVersionBehavior;
   }
+
+  /**
+   * Used for CP users for customizing max versions and ttl.
+   */
+  ScanInfo customize(int maxVersions, long ttl) {
+    return new ScanInfo(family, minVersions, maxVersions, ttl, keepDeletedCells, ttl, comparator,
+        ttl, usePread, maxVersions, parallelSeekEnabled, ttl, newVersionBehavior);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
new file mode 100644
index 0000000..5a35d51
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanOptions.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * This class gives you the ability to change the max versions and TTL options before opening a
+ * scanner for a Store. And also gives you some information for the scan.
+ * <p>
+ * Changing max versions and TTL are usually safe even for flush/compaction, so here we provide a
+ * way to do it for you. If you want to do other complicated stuffs such as filtering, please wrap
+ * the {@link InternalScanner} in the {@code preCompact} and {@code preFlush} methods in
+ * {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}.
+ * <p>
+ * For user scans, we also provide this class as a parameter in the {@code preStoreScannerOpen}
+ * method in {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}. You can use it to change
+ * the inherent properties for a Store. For example, even if you use {@code Scan.readAllVersions},
+ * you still can not read two versions if the max versions property of the Store is one. You need to
+ * set the max versions to a value greater than two in {@code preStoreScannerOpen}.
+ * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preFlushScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
+ *      Store, ScanOptions, FlushLifeCycleTracker)
+ * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preCompactScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
+ *      Store, ScanType, ScanOptions,
+ *      org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker,
+ *      org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest)
+ * @see org.apache.hadoop.hbase.coprocessor.RegionObserver#preStoreScannerOpen(org.apache.hadoop.hbase.coprocessor.ObserverContext,
+ *      Store, ScanOptions)
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
+@InterfaceStability.Evolving
+public interface ScanOptions {
+
+  int getMaxVersions();
+
+  void setMaxVersions(int maxVersions);
+
+  default void readAllVersions() {
+    setMaxVersions(Integer.MAX_VALUE);
+  }
+
+  long getTTL();
+
+  void setTTL(long ttl);
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index b0bff10..442d47d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -80,9 +80,14 @@ abstract class StoreFlusher {
    */
   protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
       long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException {
-    InternalScanner scanner =
-        new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners,
-            ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
+    ScanInfo scanInfo;
+    if (store.getCoprocessorHost() != null) {
+      scanInfo = store.getCoprocessorHost().preFlushScannerOpen(store, tracker);
+    } else {
+      scanInfo = store.getScanInfo();
+    }
+    InternalScanner scanner = new StoreScanner(store, scanInfo, snapshotScanners,
+        ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
     assert scanner != null;
     if (store.getCoprocessorHost() != null) {
       try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 43079a6..b2389eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -23,7 +23,6 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.ReentrantLock;
@@ -67,7 +66,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
   private static final Log LOG = LogFactory.getLog(StoreScanner.class);
   // In unit tests, the store could be null
-  protected final Optional<HStore> store;
+  protected final HStore store;
   private ScanQueryMatcher matcher;
   protected KeyValueHeap heap;
   private boolean cacheBlocks;
@@ -160,7 +159,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   private boolean topChanged = false;
 
   /** An internal constructor. */
-  private StoreScanner(Optional<HStore> store, Scan scan, ScanInfo scanInfo,
+  private StoreScanner(HStore store, Scan scan, ScanInfo scanInfo,
       int numColumns, long readPt, boolean cacheBlocks, ScanType scanType) {
     this.readPt = readPt;
     this.store = store;
@@ -199,15 +198,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     this.preadMaxBytes = scanInfo.getPreadMaxBytes();
     this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
     // Parallel seeking is on if the config allows and more there is more than one store file.
-    this.store.ifPresent(s -> {
-      if (s.getStorefilesCount() > 1) {
-        RegionServerServices rsService = ((HStore) s).getHRegion().getRegionServerServices();
-        if (rsService != null && scanInfo.isParallelSeekEnabled()) {
-          this.parallelSeekEnabled = true;
-          this.executor = rsService.getExecutorService();
-        }
+    if (store != null && store.getStorefilesCount() > 1) {
+      RegionServerServices rsService = store.getHRegion().getRegionServerServices();
+      if (rsService != null && scanInfo.isParallelSeekEnabled()) {
+        this.parallelSeekEnabled = true;
+        this.executor = rsService.getExecutorService();
       }
-    });
+    }
   }
 
   private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
@@ -225,7 +222,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    */
   public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
       long readPt) throws IOException {
-    this(Optional.of(store), scan, scanInfo, columns != null ? columns.size() : 0, readPt,
+    this(store, scan, scanInfo, columns != null ? columns.size() : 0, readPt,
         scan.getCacheBlocks(), ScanType.USER_SCAN);
     if (columns != null && scan.isRaw()) {
       throw new DoNotRetryIOException("Cannot specify any column for a raw scan");
@@ -275,11 +272,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    * @param scanners ancillary scanners
    * @param smallestReadPoint the readPoint that we should use for tracking versions
    */
-  public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
-      List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
-      long earliestPutTs) throws IOException {
-    this(store, scanInfo, maxVersions, scanners, scanType, smallestReadPoint, earliestPutTs, null,
-        null);
+  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
+      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+    this(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
   }
 
   /**
@@ -292,21 +287,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
    */
-  public StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
-      List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
-      byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
-    this(store, scanInfo, maxVersions, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
+  public StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
+      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
+      byte[] dropDeletesToRow) throws IOException {
+    this(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
         earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
   }
 
-  private StoreScanner(HStore store, ScanInfo scanInfo, OptionalInt maxVersions,
-      List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
-      long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
-    this(Optional.of(store),
-        maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
-            : SCAN_FOR_COMPACTION,
-        scanInfo, 0, store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED),
-        false, scanType);
+  private StoreScanner(HStore store, ScanInfo scanInfo, List<? extends KeyValueScanner> scanners,
+      ScanType scanType, long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
+      byte[] dropDeletesToRow) throws IOException {
+    this(store, SCAN_FOR_COMPACTION, scanInfo, 0,
+        store.getHRegion().getReadPoint(IsolationLevel.READ_COMMITTED), false, scanType);
     assert scanType != ScanType.USER_SCAN;
     matcher =
         CompactionScanQueryMatcher.create(scanInfo, scanType, smallestReadPoint, earliestPutTs,
@@ -333,7 +325,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   // For mob compaction only as we do not have a Store instance when doing mob compaction.
   public StoreScanner(ScanInfo scanInfo, ScanType scanType,
       List<? extends KeyValueScanner> scanners) throws IOException {
-    this(Optional.empty(), SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
+    this(null, SCAN_FOR_COMPACTION, scanInfo, 0, Long.MAX_VALUE, false, scanType);
     assert scanType != ScanType.USER_SCAN;
     this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE, 0L,
       oldestUnexpiredTS, now, null, null, null);
@@ -345,7 +337,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   StoreScanner(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
       List<? extends KeyValueScanner> scanners) throws IOException {
     // 0 is passed as readpoint because the test bypasses Store
-    this(Optional.empty(), scan, scanInfo, columns != null ? columns.size() : 0, 0L,
+    this(null, scan, scanInfo, columns != null ? columns.size() : 0, 0L,
         scan.getCacheBlocks(), ScanType.USER_SCAN);
     this.matcher =
         UserScanQueryMatcher.create(scan, scanInfo, columns, oldestUnexpiredTS, now, null);
@@ -357,7 +349,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   StoreScanner(ScanInfo scanInfo, OptionalInt maxVersions, ScanType scanType,
       List<? extends KeyValueScanner> scanners) throws IOException {
     // 0 is passed as readpoint because the test bypasses Store
-    this(Optional.empty(), maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
+    this(null, maxVersions.isPresent() ? new Scan().readVersions(maxVersions.getAsInt())
         : SCAN_FOR_COMPACTION, scanInfo, 0, 0L, false, scanType);
     this.matcher = CompactionScanQueryMatcher.create(scanInfo, scanType, Long.MAX_VALUE,
       HConstants.OLDEST_TIMESTAMP, oldestUnexpiredTS, now, null, null, null);
@@ -478,7 +470,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       this.closing = true;
     }
     // For mob compaction, we do not have a store.
-    this.store.ifPresent(s -> s.deleteChangedReaderObserver(this));
+    if (this.store != null) {
+      this.store.deleteChangedReaderObserver(this);
+    }
     if (withDelayedScannersClose) {
       clearAndClose(scannersForDelayedClose);
       clearAndClose(memStoreScannersAfterFlush);
@@ -550,8 +544,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
 
     // Only do a sanity-check if store and comparator are available.
-    CellComparator comparator =
-        store.map(s -> s.getComparator()).orElse(null);
+    CellComparator comparator = store != null ? store.getComparator() : null;
 
     int count = 0;
     long totalBytesRead = 0;
@@ -864,8 +857,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
    * @return if top of heap has changed (and KeyValueHeap has to try the next KV)
    */
   protected final boolean reopenAfterFlush() throws IOException {
-    // here we can make sure that we have a Store instance.
-    HStore store = this.store.get();
+    // here we can make sure that we have a Store instance so no null check on store.
     Cell lastTop = heap.peek();
     // When we have the scan object, should we not pass it to getScanners() to get a limited set of
     // scanners? We did so in the constructor and we could have done it now by storing the scan
@@ -992,9 +984,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     List<KeyValueScanner> fileScanners = null;
     List<KeyValueScanner> newCurrentScanners;
     KeyValueHeap newHeap;
-    // We must have a store instance here
-    HStore store = this.store.get();
     try {
+      // We must have a store instance here so no null check
       // recreate the scanners on the current file scanners
       fileScanners = store.recreateScanners(scannersToClose, cacheBlocks, false, false,
         matcher, scan.getStartRow(), scan.includeStartRow(), scan.getStopRow(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 6ed8fef..817ddf8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver.compactions;
 
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.EARLIEST_PUT_TS;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.TIMERANGE_KEY;
+import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_DROP_DELETES;
+import static org.apache.hadoop.hbase.regionserver.ScanType.COMPACT_RETAIN_DELETES;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -26,7 +28,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.OptionalInt;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -40,10 +41,12 @@ import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
 import org.apache.hadoop.hbase.regionserver.CellSink;
+import org.apache.hadoop.hbase.regionserver.CustomizedScanInfoBuilder;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.ShipperListener;
@@ -230,7 +233,7 @@ public abstract class Compactor<T extends CellSink> {
 
     ScanType getScanType(CompactionRequestImpl request);
 
-    InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
+    InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners, ScanType scanType,
         FileDetails fd, long smallestReadPoint) throws IOException;
   }
 
@@ -238,14 +241,13 @@ public abstract class Compactor<T extends CellSink> {
 
     @Override
     public ScanType getScanType(CompactionRequestImpl request) {
-      return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
-          : ScanType.COMPACT_RETAIN_DELETES;
+      return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
     }
 
     @Override
-    public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
-        FileDetails fd, long smallestReadPoint) throws IOException {
-      return Compactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
+    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
+        ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
+      return Compactor.this.createScanner(store, scanInfo, scanners, scanType, smallestReadPoint,
         fd.earliestPutTs);
     }
   };
@@ -266,6 +268,31 @@ public abstract class Compactor<T extends CellSink> {
     /* includesTags = */fd.maxTagsLength > 0, shouldDropBehind);
   }
 
+  private ScanInfo preCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
+      User user) throws IOException {
+    if (store.getCoprocessorHost() == null) {
+      return store.getScanInfo();
+    }
+    return store.getCoprocessorHost().preCompactScannerOpen(store, scanType, request.getTracker(),
+      request, user);
+  }
+
+  /**
+   * Calls coprocessor, if any, to create scanners - after normal scanner creation.
+   * @param request Compaction request.
+   * @param scanType Scan type.
+   * @param scanner The default scanner created for compaction.
+   * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
+   */
+  private InternalScanner postCompactScannerOpen(CompactionRequestImpl request, ScanType scanType,
+      InternalScanner scanner, User user) throws IOException {
+    if (store.getCoprocessorHost() == null) {
+      return scanner;
+    }
+    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
+      request, user);
+  }
+
   protected final List<Path> compact(final CompactionRequestImpl request,
       InternalScannerFactory scannerFactory, CellSinkFactory<T> sinkFactory,
       ThroughputController throughputController, User user) throws IOException {
@@ -290,8 +317,9 @@ public abstract class Compactor<T extends CellSink> {
     try {
       /* Include deletes, unless we are doing a major compaction */
       ScanType scanType = scannerFactory.getScanType(request);
-      scanner = postCreateCoprocScanner(request, scanType,
-        scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint), user);
+      ScanInfo scanInfo = preCompactScannerOpen(request, scanType, user);
+      scanner = postCompactScannerOpen(request, scanType,
+        scannerFactory.createScanner(scanInfo, scanners, scanType, fd, smallestReadPoint), user);
       if (scanner == null) {
         // NULL scanner returned from coprocessor hooks means skip normal processing.
         return new ArrayList<>();
@@ -326,22 +354,6 @@ public abstract class Compactor<T extends CellSink> {
   protected abstract void abortWriter(T writer) throws IOException;
 
   /**
-   * Calls coprocessor, if any, to create scanners - after normal scanner creation.
-   * @param request Compaction request.
-   * @param scanType Scan type.
-   * @param scanner The default scanner created for compaction.
-   * @return Scanner scanner to use (usually the default); null if compaction should not proceed.
-   */
-  private InternalScanner postCreateCoprocScanner(CompactionRequestImpl request, ScanType scanType,
-      InternalScanner scanner, User user) throws IOException {
-    if (store.getCoprocessorHost() == null) {
-      return scanner;
-    }
-    return store.getCoprocessorHost().preCompact(store, scanner, scanType, request.getTracker(),
-      request, user);
-  }
-
-  /**
    * Performs the compaction.
    * @param fd FileDetails of cell sink writer
    * @param scanner Where to read from.
@@ -475,10 +487,10 @@ public abstract class Compactor<T extends CellSink> {
    * @param earliestPutTs Earliest put across all files.
    * @return A compaction scanner.
    */
-  protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
-      ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
-    return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners, scanType,
-        smallestReadPoint, earliestPutTs);
+  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+      List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
+      long earliestPutTs) throws IOException {
+    return new StoreScanner(store, scanInfo, scanners, scanType, smallestReadPoint, earliestPutTs);
   }
 
   /**
@@ -490,10 +502,10 @@ public abstract class Compactor<T extends CellSink> {
    * @param dropDeletesToRow Drop deletes ending with this row, exclusive. Can be null.
    * @return A compaction scanner.
    */
-  protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
-      long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
-      byte[] dropDeletesToRow) throws IOException {
-    return new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), scanners,
-        smallestReadPoint, earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
+  protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+      List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
+      byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
+    return new StoreScanner(store, scanInfo, scanners, smallestReadPoint, earliestPutTs,
+        dropDeletesFromRow, dropDeletesToRow);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index f4836a8..c9e591e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
@@ -66,13 +67,13 @@ public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFil
     }
 
     @Override
-    public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
-        FileDetails fd, long smallestReadPoint) throws IOException {
+    public InternalScanner createScanner(ScanInfo scanInfo, List<StoreFileScanner> scanners,
+        ScanType scanType, FileDetails fd, long smallestReadPoint) throws IOException {
       return (majorRangeFromRow == null)
-          ? StripeCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
-            fd.earliestPutTs)
-          : StripeCompactor.this.createScanner(store, scanners, smallestReadPoint, fd.earliestPutTs,
-            majorRangeFromRow, majorRangeToRow);
+          ? StripeCompactor.this.createScanner(store, scanInfo, scanners, scanType,
+            smallestReadPoint, fd.earliestPutTs)
+          : StripeCompactor.this.createScanner(store, scanInfo, scanners, smallestReadPoint,
+            fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java
index f18ccc0..6b16b08 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSideScanExcpetion.java
@@ -124,11 +124,10 @@ public class TestFromClientSideScanExcpetion {
     }
 
     @Override
-    protected KeyValueScanner createScanner(Scan scan, NavigableSet<byte[]> targetCols, long readPt)
-        throws IOException {
-      return scan.isReversed()
-          ? new ReversedStoreScanner(this, getScanInfo(), scan, targetCols, readPt)
-          : new MyStoreScanner(this, getScanInfo(), scan, targetCols, readPt);
+    protected KeyValueScanner createScanner(Scan scan, ScanInfo scanInfo,
+        NavigableSet<byte[]> targetCols, long readPt) throws IOException {
+      return scan.isReversed() ? new ReversedStoreScanner(this, scanInfo, scan, targetCols, readPt)
+          : new MyStoreScanner(this, scanInfo, scan, targetCols, readPt);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
index 6316809..7248f56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
@@ -211,11 +211,9 @@ public class TestCompaction {
       // Multiple versions allowed for an entry, so the delete isn't enough
       // Lower TTL and expire to ensure that all our entries have been wiped
       final int ttl = 1000;
-      for (HStore store: this.r.stores.values()) {
+      for (HStore store : this.r.stores.values()) {
         ScanInfo old = store.getScanInfo();
-        ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
-            old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0,
-            old.getComparator(), old.isNewVersionBehavior());
+        ScanInfo si = old.customize(old.getMaxVersions(), ttl);
         store.setScanInfo(si);
       }
       Thread.sleep(ttl);

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
index c6c0bdc..6038bb2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
@@ -158,10 +158,7 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
   public void testCompactionEmptyHFile() throws IOException {
     // Set TTL
     ScanInfo oldScanInfo = store.getScanInfo();
-    ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
-        oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
-        oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(),
-        oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator(), oldScanInfo.isNewVersionBehavior());
+    ScanInfo newScanInfo = oldScanInfo.customize(oldScanInfo.getMaxVersions(), 600);
     store.setScanInfo(newScanInfo);
     // Do not compact empty store file
     List<HStoreFile> candidates = sfCreate(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 4499cd5..268b352 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -403,6 +403,8 @@ public class TestHRegion {
     RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
     when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class),
       Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null);
+    when(mockedCPHost.preFlushScannerOpen(Mockito.isA(HStore.class),
+      Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(store.getScanInfo());
     region.setCoprocessorHost(mockedCPHost);
     region.put(put);
     region.flush(true);

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
index f443705..2a556d7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java
@@ -292,9 +292,7 @@ public class TestMajorCompaction {
     final int ttl = 1000;
     for (HStore store : r.getStores()) {
       ScanInfo old = store.getScanInfo();
-      ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
-          old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0,
-          old.getComparator(), old.isNewVersionBehavior());
+      ScanInfo si = old.customize(old.getMaxVersions(), ttl);
       store.setScanInfo(si);
     }
     Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
index a5a0e78..95c2c56 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestDateTieredCompactor.java
@@ -110,15 +110,16 @@ public class TestDateTieredCompactor {
 
     return new DateTieredCompactor(conf, store) {
       @Override
-      protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
-          long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
-          byte[] dropDeletesToRow) throws IOException {
+      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+          List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
+          byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
         return scanner;
       }
 
       @Override
-      protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
-          ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+          List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
+          long earliestPutTs) throws IOException {
         return scanner;
       }
     };

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
index af30b7c..48e560c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.HStoreFile;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
@@ -787,15 +788,16 @@ public class TestStripeCompactionPolicy {
     final Scanner scanner = new Scanner();
     return new StripeCompactor(conf, store) {
       @Override
-      protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
-          long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
-          byte[] dropDeletesToRow) throws IOException {
+      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+          List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
+          byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
         return scanner;
       }
 
       @Override
-      protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
-          ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+          List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
+          long earliestPutTs) throws IOException {
         return scanner;
       }
     };

http://git-wip-us.apache.org/repos/asf/hbase/blob/15388d4e/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
index dbf95f3..772a674 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactor.java
@@ -207,15 +207,16 @@ public class TestStripeCompactor {
 
     return new StripeCompactor(conf, store) {
       @Override
-      protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
-          long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
-          byte[] dropDeletesToRow) throws IOException {
+      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+          List<StoreFileScanner> scanners, long smallestReadPoint, long earliestPutTs,
+          byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
         return scanner;
       }
 
       @Override
-      protected InternalScanner createScanner(HStore store, List<StoreFileScanner> scanners,
-          ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
+      protected InternalScanner createScanner(HStore store, ScanInfo scanInfo,
+          List<StoreFileScanner> scanners, ScanType scanType, long smallestReadPoint,
+          long earliestPutTs) throws IOException {
         return scanner;
       }
     };


Mime
View raw message