hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bus...@apache.org
Subject [05/47] hbase git commit: HBASE-18453 CompactionRequest should not be exposed to user directly
Date Sun, 24 Sep 2017 00:34:05 GMT
HBASE-18453 CompactionRequest should not be exposed to user directly


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/61d10fef
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/61d10fef
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/61d10fef

Branch: refs/heads/HBASE-18467
Commit: 61d10feffaa7b96ee46e2a6f1e542d80c1d76f42
Parents: 38e983e
Author: zhangduo <zhangduo@apache.org>
Authored: Mon Sep 11 08:50:37 2017 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Thu Sep 14 20:37:33 2017 +0800

----------------------------------------------------------------------
 .../example/ZooKeeperScanPolicyObserver.java    |   3 +-
 .../example/TestRefreshHFilesEndpoint.java      |  11 +-
 .../hbase/regionserver/CompactionTool.java      |  11 +-
 .../hbase/coprocessor/RegionObserver.java       |  36 +-
 .../hadoop/hbase/regionserver/CompactSplit.java | 233 ++++----
 .../hbase/regionserver/CompactionRequestor.java | 100 ----
 .../regionserver/FlushAllLargeStoresPolicy.java |  18 +-
 .../regionserver/FlushAllStoresPolicy.java      |   2 +-
 .../regionserver/FlushLargeStoresPolicy.java    |   2 +-
 .../FlushNonSloppyStoresFirstPolicy.java        |  29 +-
 .../hadoop/hbase/regionserver/FlushPolicy.java  |   2 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 590 +++++++++----------
 .../hbase/regionserver/HRegionServer.java       |  53 +-
 .../hadoop/hbase/regionserver/HStore.java       |  60 +-
 .../hbase/regionserver/MemStoreFlusher.java     |  12 +-
 .../MetricsRegionServerWrapperImpl.java         |   2 +-
 .../regionserver/MetricsRegionWrapperImpl.java  |   2 +-
 .../hbase/regionserver/RSRpcServices.java       |  20 +-
 .../hadoop/hbase/regionserver/Region.java       |  34 +-
 .../regionserver/RegionCoprocessorHost.java     |  60 +-
 .../regionserver/RegionServerServices.java      |   5 -
 .../hbase/regionserver/RegionSplitPolicy.java   |   2 +-
 .../apache/hadoop/hbase/regionserver/Store.java |  20 +-
 .../compactions/CompactionLifeCycleTracker.java |  52 ++
 .../compactions/CompactionRequest.java          |  71 +--
 .../regionserver/compactions/Compactor.java     |  15 +-
 .../hbase/security/access/AccessController.java |   4 +-
 .../hbase-webapps/regionserver/region.jsp       |   2 +-
 .../hadoop/hbase/MockRegionServerServices.java  |  14 +-
 .../org/apache/hadoop/hbase/TestIOFencing.java  |   4 +-
 ...estAvoidCellReferencesIntoShippedBlocks.java |   4 +-
 .../client/TestBlockEvictionFromClient.java     |   2 +-
 .../hbase/coprocessor/SimpleRegionObserver.java |  13 +-
 .../coprocessor/TestCoprocessorInterface.java   |   5 +-
 .../TestRegionObserverInterface.java            |   6 +-
 .../TestRegionObserverScannerOpenHook.java      |   8 +-
 .../hadoop/hbase/master/MockRegionServer.java   |  52 +-
 .../hbase/mob/compactions/TestMobCompactor.java |  12 +-
 .../hbase/namespace/TestNamespaceAuditor.java   |   3 +-
 .../quotas/TestFileSystemUtilizationChore.java  |   6 +-
 .../regionserver/NoOpScanPolicyObserver.java    |   3 +-
 .../regionserver/StatefulStoreMockMaker.java    |  43 +-
 .../hbase/regionserver/TestCompaction.java      |  70 +--
 .../hbase/regionserver/TestHMobStore.java       |   7 +-
 .../regionserver/TestHRegionServerBulkLoad.java |   4 +-
 .../hbase/regionserver/TestMajorCompaction.java |   7 +-
 .../TestSplitTransactionOnCluster.java          |  15 +-
 .../regionserver/TestSplitWalDataLoss.java      |   2 +-
 .../hadoop/hbase/regionserver/TestStore.java    |  17 +-
 .../regionserver/wal/AbstractTestWALReplay.java |   6 +-
 .../hbase/util/TestCoprocessorScanPolicy.java   |   3 +-
 51 files changed, 800 insertions(+), 957 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
index 344d188..6b31664 100644
--- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
+++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.regionserver.ScanInfo;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -203,7 +204,7 @@ public class ZooKeeperScanPolicyObserver implements RegionObserver {
   @Override
   public InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
c,
       Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long
earliestPutTs,
-      InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+      InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException
{
     ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
     if (scanInfo == null) {
       // take default action

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
index a037f85..257b075 100644
--- a/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
+++ b/hbase-examples/src/test/java/org/apache/hadoop/hbase/coprocessor/example/TestRefreshHFilesEndpoint.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -144,15 +145,17 @@ public class TestRefreshHFilesEndpoint {
     }
 
     @Override
-    public List<Store> getStores() {
-      List<Store> list = new ArrayList<Store>(stores.size());
+    public List<HStore> getStores() {
+      List<HStore> list = new ArrayList<>(stores.size());
       /**
        * This is used to trigger the custom definition (faulty)
        * of refresh HFiles API.
        */
       try {
-        if (this.store == null)
-          store = new HStoreWithFaultyRefreshHFilesAPI(this, new HColumnDescriptor(FAMILY),
this.conf);
+        if (this.store == null) {
+          store = new HStoreWithFaultyRefreshHFilesAPI(this,
+              ColumnFamilyDescriptorBuilder.of(FAMILY), this.conf);
+        }
         list.add(store);
       } catch (IOException ioe) {
         LOG.info("Couldn't instantiate custom store implementation", ioe);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
index de59c20..bb01459 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Optional;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -42,6 +43,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.mapreduce.JobUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -158,10 +160,13 @@ public class CompactionTool extends Configured implements Tool {
         store.triggerMajorCompaction();
       }
       do {
-        CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
-        if (compaction == null) break;
+        Optional<CompactionContext> compaction =
+            store.requestCompaction(Store.PRIORITY_USER, CompactionLifeCycleTracker.DUMMY,
null);
+        if (!compaction.isPresent()) {
+          break;
+        }
         List<StoreFile> storeFiles =
-            store.compact(compaction, NoLimitThroughputController.INSTANCE);
+            store.compact(compaction.get(), NoLimitThroughputController.INSTANCE);
         if (storeFiles != null && !storeFiles.isEmpty()) {
           if (keepCompactedFiles && deleteCompacted) {
             for (StoreFile storeFile: storeFiles) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
index b036608..ae57747 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
@@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileReader;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.querymatcher.DeleteTracker;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
 import org.apache.hadoop.hbase.util.Pair;
@@ -186,10 +186,10 @@ public interface RegionObserver extends Coprocessor {
    * @param c the environment provided by the region server
    * @param store the store where compaction is being requested
    * @param candidates the store files currently available for compaction
-   * @param request custom compaction request
+   * @param tracker tracker used to track the life cycle of a compaction
    */
   default void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c,
Store store,
-      List<StoreFile> candidates, CompactionRequest request) throws IOException {}
+      List<StoreFile> candidates, CompactionLifeCycleTracker tracker) throws IOException
{}
 
   /**
    * Called after the {@link StoreFile}s to compact have been selected from the available
@@ -197,10 +197,10 @@ public interface RegionObserver extends Coprocessor {
    * @param c the environment provided by the region server
    * @param store the store being compacted
    * @param selected the store files selected to compact
-   * @param request custom compaction request
+   * @param tracker tracker used to track the life cycle of a compaction
    */
   default void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c,
Store store,
-      ImmutableList<StoreFile> selected, CompactionRequest request) {}
+      ImmutableList<StoreFile> selected, CompactionLifeCycleTracker tracker) {}
 
   /**
    * Called prior to writing the {@link StoreFile}s selected for compaction into a new
@@ -220,13 +220,13 @@ public interface RegionObserver extends Coprocessor {
    * @param store the store being compacted
    * @param scanner the scanner over existing data used in the store file rewriting
    * @param scanType type of Scan
-   * @param request the requested compaction
+   * @param tracker tracker used to track the life cycle of a compaction
    * @return the scanner to use during compaction. Should not be {@code null} unless the
    *         implementation is writing new store files on its own.
    */
-  default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment>
c,
-      Store store, InternalScanner scanner, ScanType scanType,
-      CompactionRequest request) throws IOException {
+  default InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment>
c, Store store,
+      InternalScanner scanner, ScanType scanType, CompactionLifeCycleTracker tracker)
+      throws IOException {
     return scanner;
   }
 
@@ -245,14 +245,14 @@ public interface RegionObserver extends Coprocessor {
    * @param earliestPutTs timestamp of the earliest put that was found in any of the involved
store
    *          files
    * @param s the base scanner, if not {@code null}, from previous RegionObserver in the
chain
-   * @param request compaction request
+   * @param tracker used to track the life cycle of a compaction
    * @param readPoint the readpoint to create scanner
    * @return the scanner to use during compaction. {@code null} if the default implementation
is to
    *          be used.
    */
   default InternalScanner preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment>
c,
       Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long
earliestPutTs,
-      InternalScanner s, CompactionRequest request, long readPoint) throws IOException {
+      InternalScanner s, CompactionLifeCycleTracker tracker, long readPoint) throws IOException
{
     return s;
   }
 
@@ -261,10 +261,10 @@ public interface RegionObserver extends Coprocessor {
    * @param c the environment provided by the region server
    * @param store the store being compacted
    * @param resultFile the new store file written out during compaction
-   * @param request the requested compaction
+   * @param tracker used to track the life cycle of a compaction
    */
   default void postCompact(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
-      StoreFile resultFile, CompactionRequest request) throws IOException {}
+      StoreFile resultFile, CompactionLifeCycleTracker tracker) throws IOException {}
 
   /**
    * Called before the region is reported as closed to the master.
@@ -798,12 +798,12 @@ public interface RegionObserver extends Coprocessor {
    * Called before a store opens a new scanner.
    * This hook is called when a "user" scanner is opened.
    * <p>
-   * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
and {@link #preCompactScannerOpen(ObserverContext,
-   *  Store, List, ScanType, long, InternalScanner, CompactionRequest, long)}
-   * to override scanners created for flushes or compactions, resp.
+   * See {@link #preFlushScannerOpen(ObserverContext, Store, List, InternalScanner, long)}
+   * and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long,
+   * InternalScanner, CompactionLifeCycleTracker, long)} to override scanners created for
flushes
+   * or compactions, resp.
    * <p>
-   * Call CoprocessorEnvironment#complete to skip any subsequent chained
-   * coprocessors.
+   * Call CoprocessorEnvironment#complete to skip any subsequent chained coprocessors.
    * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
    * effect in this hook.
    * <p>

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 621bead..cdeeff7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -21,10 +21,9 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
-import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
-import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
@@ -41,24 +40,23 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.StealJobQueue;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.StringUtils;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
-
 /**
  * Compact region on request and then run split if appropriate
  */
 @InterfaceAudience.Private
-public class CompactSplit implements CompactionRequestor, PropagatingConfigurationObserver
{
+public class CompactSplit implements PropagatingConfigurationObserver {
   private static final Log LOG = LogFactory.getLog(CompactSplit.class);
 
   // Configuration key for the large compaction threads.
@@ -233,126 +231,89 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
     }
   }
 
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final
String why)
-      throws IOException {
-    return requestCompaction(r, why, null);
+  public synchronized void requestCompaction(HRegion region, String why, int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    requestCompactionInternal(region, why, priority, true, tracker, user);
   }
 
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final
String why,
-      List<Pair<CompactionRequest, Store>> requests) throws IOException {
-    return requestCompaction(r, why, Store.NO_PRIORITY, requests, null);
-  }
-
-  @Override
-  public synchronized CompactionRequest requestCompaction(final Region r, final Store s,
-      final String why, CompactionRequest request) throws IOException {
-    return requestCompaction(r, s, why, Store.NO_PRIORITY, request, null);
+  public synchronized void requestCompaction(HRegion region, HStore store, String why, int
priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    requestCompactionInternal(region, store, why, priority, true, tracker, user);
   }
 
-  @Override
-  public synchronized List<CompactionRequest> requestCompaction(final Region r, final
String why,
-      int p, List<Pair<CompactionRequest, Store>> requests, User user) throws
IOException {
-    return requestCompactionInternal(r, why, p, requests, true, user);
-  }
-
-  private List<CompactionRequest> requestCompactionInternal(final Region r, final String
why,
-      int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow,
User user)
-          throws IOException {
-    // not a special compaction request, so make our own list
-    List<CompactionRequest> ret = null;
-    if (requests == null) {
-      ret = selectNow ? new ArrayList<CompactionRequest>(r.getStores().size()) : null;
-      for (Store s : r.getStores()) {
-        CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
-        if (selectNow) ret.add(cr);
-      }
-    } else {
-      Preconditions.checkArgument(selectNow); // only system requests have selectNow == false
-      ret = new ArrayList<CompactionRequest>(requests.size());
-      for (Pair<CompactionRequest, Store> pair : requests) {
-        ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst(), user));
-      }
+  private void requestCompactionInternal(HRegion region, String why, int priority,
+      boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException
{
+    // request compaction on all stores
+    for (HStore store : region.stores.values()) {
+      requestCompactionInternal(region, store, why, priority, selectNow, tracker, user);
     }
-    return ret;
-  }
-
-  public CompactionRequest requestCompaction(final Region r, final Store s,
-      final String why, int priority, CompactionRequest request, User user) throws IOException
{
-    return requestCompactionInternal(r, s, why, priority, request, true, user);
-  }
-
-  public synchronized void requestSystemCompaction(
-      final Region r, final String why) throws IOException {
-    requestCompactionInternal(r, why, Store.NO_PRIORITY, null, false, null);
-  }
-
-  public void requestSystemCompaction(
-      final Region r, final Store s, final String why) throws IOException {
-    requestCompactionInternal(r, s, why, Store.NO_PRIORITY, null, false, null);
   }
 
-  /**
-   * @param r region store belongs to
-   * @param s Store to request compaction on
-   * @param why Why compaction requested -- used in debug messages
-   * @param priority override the default priority (NO_PRIORITY == decide)
-   * @param request custom compaction request. Can be <tt>null</tt> in which
case a simple
-   *          compaction will be used.
-   */
-  private synchronized CompactionRequest requestCompactionInternal(final Region r, final
Store s,
-      final String why, int priority, CompactionRequest request, boolean selectNow, User
user)
-          throws IOException {
-    if (this.server.isStopped()
-        || (r.getTableDescriptor() != null && !r.getTableDescriptor().isCompactionEnabled()))
{
-      return null;
+  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
+      boolean selectNow, CompactionLifeCycleTracker tracker, User user) throws IOException
{
+    if (this.server.isStopped() || (region.getTableDescriptor() != null &&
+        !region.getTableDescriptor().isCompactionEnabled())) {
+      return;
     }
-
-    CompactionContext compaction = null;
+    Optional<CompactionContext> compaction;
     if (selectNow) {
-      compaction = selectCompaction(r, s, priority, request, user);
-      if (compaction == null) return null; // message logged inside
+      compaction = selectCompaction(region, store, priority, tracker, user);
+      if (!compaction.isPresent()) {
+        // message logged inside
+        return;
+      }
+    } else {
+      compaction = Optional.empty();
     }
 
-    final RegionServerSpaceQuotaManager spaceQuotaManager =
-      this.server.getRegionServerSpaceQuotaManager();
-    if (spaceQuotaManager != null && spaceQuotaManager.areCompactionsDisabled(
-        r.getTableDescriptor().getTableName())) {
+    RegionServerSpaceQuotaManager spaceQuotaManager =
+        this.server.getRegionServerSpaceQuotaManager();
+    if (spaceQuotaManager != null &&
+        spaceQuotaManager.areCompactionsDisabled(region.getTableDescriptor().getTableName()))
{
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Ignoring compaction request for " + r + " as an active space quota violation
"
-            + " policy disallows compactions.");
+        LOG.debug("Ignoring compaction request for " + region +
+            " as an active space quota violation " + " policy disallows compactions.");
       }
-      return null;
+      return;
     }
 
-    // We assume that most compactions are small. So, put system compactions into small
-    // pool; we will do selection there, and move to large pool if necessary.
-    ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
-      ? longCompactions : shortCompactions;
-    pool.execute(new CompactionRunner(s, r, compaction, pool, user));
-    ((HRegion)r).incrementCompactionsQueuedCount();
+    ThreadPoolExecutor pool;
+    if (selectNow) {
+      // compaction.get is safe as we will just return if selectNow is true but no compaction
is
+      // selected
+      pool = store.throttleCompaction(compaction.get().getRequest().getSize()) ? longCompactions
+          : shortCompactions;
+    } else {
+      // We assume that most compactions are small. So, put system compactions into small
+      // pool; we will do selection there, and move to large pool if necessary.
+      pool = shortCompactions;
+    }
+    pool.execute(new CompactionRunner(store, region, compaction, pool, user));
+    region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
       LOG.debug(type + "Compaction requested: " + (selectNow ? compaction.toString() : "system")
           + (why != null && !why.isEmpty() ? "; Because: " + why : "") + "; " + this);
     }
-    return selectNow ? compaction.getRequest() : null;
   }
 
-  private CompactionContext selectCompaction(final Region r, final Store s,
-      int priority, CompactionRequest request, User user) throws IOException {
-    CompactionContext compaction = s.requestCompaction(priority, request, user);
-    if (compaction == null) {
-      if(LOG.isDebugEnabled() && r.getRegionInfo() != null) {
-        LOG.debug("Not compacting " + r.getRegionInfo().getRegionNameAsString() +
-            " because compaction request was cancelled");
-      }
-      return null;
-    }
-    assert compaction.hasSelection();
-    if (priority != Store.NO_PRIORITY) {
-      compaction.getRequest().setPriority(priority);
+  public synchronized void requestSystemCompaction(HRegion region, String why) throws IOException
{
+    requestCompactionInternal(region, why, Store.NO_PRIORITY, false,
+      CompactionLifeCycleTracker.DUMMY, null);
+  }
+
+  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
+      throws IOException {
+    requestCompactionInternal(region, store, why, Store.NO_PRIORITY, false,
+      CompactionLifeCycleTracker.DUMMY, null);
+  }
+
+  private Optional<CompactionContext> selectCompaction(HRegion region, HStore store,
int priority,
+      CompactionLifeCycleTracker tracker, User user) throws IOException {
+    Optional<CompactionContext> compaction = store.requestCompaction(priority, tracker,
user);
+    if (!compaction.isPresent() && LOG.isDebugEnabled() && region.getRegionInfo()
!= null) {
+      LOG.debug("Not compacting " + region.getRegionInfo().getRegionNameAsString() +
+          " because compaction request was cancelled");
     }
     return compaction;
   }
@@ -468,33 +429,33 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
       if (cmp != 0) {
         return cmp;
       }
-      CompactionContext c1 = o1.compaction;
-      CompactionContext c2 = o2.compaction;
-      if (c1 == null) {
-        return c2 == null ? 0 : 1;
+      Optional<CompactionContext> c1 = o1.compaction;
+      Optional<CompactionContext> c2 = o2.compaction;
+      if (c1.isPresent()) {
+        return c2.isPresent() ? compare(c1.get().getRequest(), c2.get().getRequest()) : -1;
       } else {
-        return c2 == null ? -1 : compare(c1.getRequest(), c2.getRequest());
+        return c2.isPresent() ? 1 : 0;
       }
     }
   };
 
   private final class CompactionRunner implements Runnable {
-    private final Store store;
+    private final HStore store;
     private final HRegion region;
-    private CompactionContext compaction;
+    private final Optional<CompactionContext> compaction;
     private int queuedPriority;
     private ThreadPoolExecutor parent;
     private User user;
     private long time;
 
-    public CompactionRunner(Store store, Region region, CompactionContext compaction,
+    public CompactionRunner(HStore store, HRegion region, Optional<CompactionContext>
compaction,
         ThreadPoolExecutor parent, User user) {
       super();
       this.store = store;
-      this.region = (HRegion) region;
+      this.region = region;
       this.compaction = compaction;
-      this.queuedPriority =
-          compaction == null ? store.getCompactPriority() : compaction.getRequest().getPriority();
+      this.queuedPriority = compaction.isPresent() ? compaction.get().getRequest().getPriority()
+          : store.getCompactPriority();
       this.parent = parent;
       this.user = user;
       this.time = System.currentTimeMillis();
@@ -502,14 +463,15 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
 
     @Override
     public String toString() {
-      return (this.compaction != null) ? ("Request = " + compaction.getRequest())
-          : ("regionName = " + region.toString() + ", storeName = " + store.toString() +
-             ", priority = " + queuedPriority + ", time = " + time);
+      return compaction.map(c -> "Request = " + c.getRequest())
+          .orElse("regionName = " + region.toString() + ", storeName = " + store.toString()
+
+              ", priority = " + queuedPriority + ", time = " + time);
     }
 
     private void doCompaction(User user) {
+      CompactionContext c;
       // Common case - system compaction without a file selection. Select now.
-      if (this.compaction == null) {
+      if (!compaction.isPresent()) {
         int oldPriority = this.queuedPriority;
         this.queuedPriority = this.store.getCompactPriority();
         if (this.queuedPriority > oldPriority) {
@@ -518,44 +480,49 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
           this.parent.execute(this);
           return;
         }
+        Optional<CompactionContext> selected;
         try {
-          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null,
user);
+          selected = selectCompaction(this.region, this.store, queuedPriority,
+            CompactionLifeCycleTracker.DUMMY, user);
         } catch (IOException ex) {
           LOG.error("Compaction selection failed " + this, ex);
           server.checkFileSystem();
           region.decrementCompactionsQueuedCount();
           return;
         }
-        if (this.compaction == null) {
+        if (!selected.isPresent()) {
           region.decrementCompactionsQueuedCount();
           return; // nothing to do
         }
+        c = selected.get();
+        assert c.hasSelection();
         // Now see if we are in correct pool for the size; if not, go to the correct one.
         // We might end up waiting for a while, so cancel the selection.
-        assert this.compaction.hasSelection();
-        ThreadPoolExecutor pool = store.throttleCompaction(
-            compaction.getRequest().getSize()) ? longCompactions : shortCompactions;
+
+        ThreadPoolExecutor pool =
+            store.throttleCompaction(c.getRequest().getSize()) ? longCompactions : shortCompactions;
 
         // Long compaction pool can process small job
         // Short compaction pool should not process large job
         if (this.parent == shortCompactions && pool == longCompactions) {
-          this.store.cancelRequestedCompaction(this.compaction);
-          this.compaction = null;
+          this.store.cancelRequestedCompaction(c);
           this.parent = pool;
           this.parent.execute(this);
           return;
         }
+      } else {
+        c = compaction.get();
       }
       // Finally we can compact something.
-      assert this.compaction != null;
+      assert c != null;
 
-      this.compaction.getRequest().beforeExecute();
+      c.getRequest().getTracker().beforeExecute(store);
       try {
         // Note: please don't put single-compaction logic here;
         //       put it into region/store/etc. This is CST logic.
         long start = EnvironmentEdgeManager.currentTime();
         boolean completed =
-            region.compact(compaction, store, compactionThroughputController, user);
+            region.compact(c, store, compactionThroughputController, user);
         long now = EnvironmentEdgeManager.currentTime();
         LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
               this + "; duration=" + StringUtils.formatTimeDiff(now, start));
@@ -582,10 +549,10 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
         region.reportCompactionRequestFailure();
         server.checkFileSystem();
       } finally {
+        c.getRequest().getTracker().afterExecute(store);
         region.decrementCompactionsQueuedCount();
         LOG.debug("CompactSplitThread Status: " + CompactSplit.this);
       }
-      this.compaction.getRequest().afterExecute();
     }
 
     @Override
@@ -615,9 +582,9 @@ public class CompactSplit implements CompactionRequestor, PropagatingConfigurati
     @Override
     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
       if (runnable instanceof CompactionRunner) {
-        CompactionRunner runner = (CompactionRunner)runnable;
+        CompactionRunner runner = (CompactionRunner) runnable;
         LOG.debug("Compaction Rejected: " + runner);
-        runner.store.cancelRequestedCompaction(runner.compaction);
+        runner.compaction.ifPresent(c -> runner.store.cancelRequestedCompaction(c));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
deleted file mode 100644
index d1f02fe..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.util.Pair;
-
-@InterfaceAudience.Private
-public interface CompactionRequestor {
-  /**
-   * @param r Region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if
no
-   *         compactions were started
-   * @throws IOException
-   */
-  List<CompactionRequest> requestCompaction(final Region r, final String why)
-      throws IOException;
-
-  /**
-   * @param r Region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @param requests custom compaction requests. Each compaction must specify the store on
which it
-   *          is acting. Can be <tt>null</tt> in which case a compaction will
be attempted on all
-   *          stores for the region.
-   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if
no
-   *         compactions were started
-   * @throws IOException
-   */
-  List<CompactionRequest> requestCompaction(
-    final Region r, final String why, List<Pair<CompactionRequest, Store>> requests
-  )
-      throws IOException;
-
-  /**
-   * @param r Region to compact
-   * @param s Store within region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @param request custom compaction request for the {@link Region} and {@link Store}. Custom
-   *          request must be <tt>null</tt> or be constructed with matching region
and store.
-   * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction
was started.
-   * @throws IOException
-   */
-  CompactionRequest requestCompaction(
-    final Region r, final Store s, final String why, CompactionRequest request
-  ) throws IOException;
-
-  /**
-   * @param r Region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @param pri Priority of this compaction. minHeap. &lt;=0 is critical
-   * @param requests custom compaction requests. Each compaction must specify the store on
which it
-   *          is acting. Can be <tt>null</tt> in which case a compaction will
be attempted on all
-   *          stores for the region.
-   * @param user  the effective user
-   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if
no
-   *         compactions were started.
-   * @throws IOException
-   */
-  List<CompactionRequest> requestCompaction(
-    final Region r, final String why, int pri, List<Pair<CompactionRequest, Store>>
requests,
-    User user
-  ) throws IOException;
-
-  /**
-   * @param r Region to compact
-   * @param s Store within region to compact
-   * @param why Why compaction was requested -- used in debug messages
-   * @param pri Priority of this compaction. minHeap. &lt;=0 is critical
-   * @param request custom compaction request to run. {@link Store} and {@link Region} for
the
-   *          request must match the region and store specified here.
-   * @param user
-   * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction
was started
-   * @throws IOException
-   */
-  CompactionRequest requestCompaction(
-    final Region r, final Store s, final String why, int pri, CompactionRequest request,
User user
-  ) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
index b0eae71..e4476d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllLargeStoresPolicy.java
@@ -31,7 +31,7 @@ import org.apache.yetus.audience.InterfaceAudience;
  * enough, then all stores will be flushed.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
+public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy {
 
   private static final Log LOG = LogFactory.getLog(FlushAllLargeStoresPolicy.class);
 
@@ -48,20 +48,22 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
   }
 
   @Override
-  public Collection<Store> selectStoresToFlush() {
+  public Collection<HStore> selectStoresToFlush() {
     // no need to select stores if only one family
     if (region.getTableDescriptor().getColumnFamilyCount() == 1) {
       return region.stores.values();
     }
     // start selection
-    Collection<Store> stores = region.stores.values();
-    Set<Store> specificStoresToFlush = new HashSet<>();
-    for (Store store : stores) {
+    Collection<HStore> stores = region.stores.values();
+    Set<HStore> specificStoresToFlush = new HashSet<>();
+    for (HStore store : stores) {
       if (shouldFlush(store)) {
         specificStoresToFlush.add(store);
       }
     }
-    if (!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
+    if (!specificStoresToFlush.isEmpty()) {
+      return specificStoresToFlush;
+    }
 
     // Didn't find any CFs which were above the threshold for selection.
     if (LOG.isDebugEnabled()) {
@@ -71,8 +73,8 @@ public class FlushAllLargeStoresPolicy extends FlushLargeStoresPolicy{
   }
 
   @Override
-  protected boolean shouldFlush(Store store) {
-    return (super.shouldFlush(store) || region.shouldFlushStore(store));
+  protected boolean shouldFlush(HStore store) {
+    return super.shouldFlush(store) || region.shouldFlushStore(store);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
index 5c7b3af..97a04f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushAllStoresPolicy.java
@@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 public class FlushAllStoresPolicy extends FlushPolicy {
 
   @Override
-  public Collection<Store> selectStoresToFlush() {
+  public Collection<HStore> selectStoresToFlush() {
     return region.stores.values();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
index e37a1a2..e0c6510 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLargeStoresPolicy.java
@@ -77,7 +77,7 @@ public abstract class FlushLargeStoresPolicy extends FlushPolicy {
     return flushSizeLowerBound;
   }
 
-  protected boolean shouldFlush(Store store) {
+  protected boolean shouldFlush(HStore store) {
     if (store.getSizeOfMemStore().getDataSize() > this.flushSizeLowerBound) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Flush Column Family " + store.getColumnFamilyName() + " of " +

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
index 1196bd5..c779ce3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushNonSloppyStoresFirstPolicy.java
@@ -32,26 +32,31 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
 public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy {
 
-  private Collection<Store> regularStores = new HashSet<>();
-  private Collection<Store> sloppyStores = new HashSet<>();
+  private Collection<HStore> regularStores = new HashSet<>();
+  private Collection<HStore> sloppyStores = new HashSet<>();
 
   /**
    * @return the stores need to be flushed.
    */
-  @Override public Collection<Store> selectStoresToFlush() {
-    Collection<Store> specificStoresToFlush = new HashSet<>();
-    for(Store store : regularStores) {
-      if(shouldFlush(store) || region.shouldFlushStore(store)) {
+  @Override
+  public Collection<HStore> selectStoresToFlush() {
+    Collection<HStore> specificStoresToFlush = new HashSet<>();
+    for (HStore store : regularStores) {
+      if (shouldFlush(store) || region.shouldFlushStore(store)) {
         specificStoresToFlush.add(store);
       }
     }
-    if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
-    for(Store store : sloppyStores) {
-      if(shouldFlush(store)) {
+    if (!specificStoresToFlush.isEmpty()) {
+      return specificStoresToFlush;
+    }
+    for (HStore store : sloppyStores) {
+      if (shouldFlush(store)) {
         specificStoresToFlush.add(store);
       }
     }
-    if(!specificStoresToFlush.isEmpty()) return specificStoresToFlush;
+    if (!specificStoresToFlush.isEmpty()) {
+      return specificStoresToFlush;
+    }
     return region.stores.values();
   }
 
@@ -59,8 +64,8 @@ public class FlushNonSloppyStoresFirstPolicy extends FlushLargeStoresPolicy
{
   protected void configureForRegion(HRegion region) {
     super.configureForRegion(region);
     this.flushSizeLowerBound = getFlushSizeLowerBound(region);
-    for(Store store : region.stores.values()) {
-      if(store.isSloppyMemstore()) {
+    for (HStore store : region.stores.values()) {
+      if (store.isSloppyMemstore()) {
         sloppyStores.add(store);
       } else {
         regularStores.add(store);

http://git-wip-us.apache.org/repos/asf/hbase/blob/61d10fef/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
index bc49c92..fecbd2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushPolicy.java
@@ -44,6 +44,6 @@ public abstract class FlushPolicy extends Configured {
   /**
    * @return the stores need to be flushed.
    */
-  public abstract Collection<Store> selectStoresToFlush();
+  public abstract Collection<HStore> selectStoresToFlush();
 
 }


Mime
View raw message