hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [3/3] hbase git commit: HBASE-14969 Add throughput controller for flush
Date Fri, 29 Jan 2016 01:44:21 GMT
HBASE-14969 Add throughput controller for flush

Signed-off-by: zhangduo <zhangduo@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b3b1ce99
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b3b1ce99
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b3b1ce99

Branch: refs/heads/master
Commit: b3b1ce99c63d79401ddda9c114850dea61af0afb
Parents: 14dd959
Author: Yu Li <jueding.ly@alibaba-inc.com>
Authored: Fri Jan 29 09:30:20 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Fri Jan 29 09:32:01 2016 +0800

----------------------------------------------------------------------
 .../hbase/mob/DefaultMobStoreCompactor.java     |   4 +-
 .../hbase/mob/DefaultMobStoreFlusher.java       |   3 +-
 .../hbase/regionserver/CompactSplitThread.java  |  10 +-
 .../hbase/regionserver/CompactionTool.java      |   4 +-
 .../hbase/regionserver/DefaultStoreEngine.java  |  10 +-
 .../hbase/regionserver/DefaultStoreFlusher.java |   5 +-
 .../hadoop/hbase/regionserver/HMobStore.java    |   4 +-
 .../hadoop/hbase/regionserver/HRegion.java      |  18 +-
 .../hbase/regionserver/HRegionServer.java       |  35 ++-
 .../hadoop/hbase/regionserver/HStore.java       |  23 +-
 .../regionserver/RegionServerServices.java      |  13 +
 .../apache/hadoop/hbase/regionserver/Store.java |  12 +-
 .../hadoop/hbase/regionserver/StoreFlusher.java |  51 +++-
 .../hbase/regionserver/StripeStoreEngine.java   |   8 +-
 .../hbase/regionserver/StripeStoreFlusher.java  |   5 +-
 .../compactions/CompactionContext.java          |   5 +-
 .../CompactionThroughputController.java         |  52 ----
 .../CompactionThroughputControllerFactory.java  |  61 ----
 .../regionserver/compactions/Compactor.java     |  28 +-
 .../compactions/DefaultCompactor.java           |  10 +-
 .../NoLimitCompactionThroughputController.java  |  66 ----
 ...sureAwareCompactionThroughputController.java | 263 ----------------
 .../compactions/StripeCompactionPolicy.java     |   9 +-
 .../compactions/StripeCompactor.java            |  11 +-
 .../CompactionThroughputControllerFactory.java  |  91 ++++++
 .../FlushThroughputControllerFactory.java       |  65 ++++
 .../throttle/NoLimitThroughputController.java   |  62 ++++
 ...sureAwareCompactionThroughputController.java | 153 ++++++++++
 .../PressureAwareFlushThroughputController.java | 136 +++++++++
 .../PressureAwareThroughputController.java      | 177 +++++++++++
 .../throttle/ThroughputControlUtil.java         |  55 ++++
 .../throttle/ThroughputController.java          |  52 ++++
 .../hadoop/hbase/MockRegionServerServices.java  |  10 +
 .../org/apache/hadoop/hbase/TestIOFencing.java  |   7 +-
 .../TestRegionObserverScannerOpenHook.java      |   8 +-
 .../hadoop/hbase/master/MockRegionServer.java   |  10 +
 .../regionserver/TestCompactSplitThread.java    |   4 +-
 .../hbase/regionserver/TestCompaction.java      |  20 +-
 .../hbase/regionserver/TestHMobStore.java       |   4 +-
 .../regionserver/TestHRegionReplayEvents.java   |   4 +-
 .../TestSplitTransactionOnCluster.java          |   4 +-
 .../hadoop/hbase/regionserver/TestStore.java    |   4 +-
 .../hbase/regionserver/TestStripeCompactor.java |   6 +-
 .../regionserver/TestStripeStoreEngine.java     |  10 +-
 .../TestCompactionWithThroughputController.java | 302 ------------------
 .../compactions/TestStripeCompactionPolicy.java |  15 +-
 .../TestCompactionWithThroughputController.java | 306 +++++++++++++++++++
 .../TestFlushWithThroughputController.java      | 217 +++++++++++++
 .../hbase/regionserver/wal/TestWALReplay.java   |  18 +-
 49 files changed, 1559 insertions(+), 891 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
index b5f412d..33eb7b9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreCompactor.java
@@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -151,7 +151,7 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
   @Override
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
       long smallestReadPoint, boolean cleanSeqId,
-      CompactionThroughputController throughputController,  boolean major) throws IOException {
+      ThroughputController throughputController,  boolean major) throws IOException {
     if (!(scanner instanceof MobCompactionStoreScanner)) {
       throw new IllegalArgumentException(
         "The scanner should be an instance of MobCompactionStoreScanner");

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
index 999d25c..eb6c739 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.regionserver.MemStoreSnapshot;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.util.StringUtils;
 
@@ -96,7 +97,7 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
    */
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
-      MonitoredTask status) throws IOException {
+      MonitoredTask status, ThroughputController throughputController) throws IOException {
     ArrayList<Path> result = new ArrayList<Path>();
     int cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
index f54f008..579f78e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
@@ -40,8 +40,8 @@ import org.apache.hadoop.hbase.conf.ConfigurationManager;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;
@@ -89,7 +89,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
   private final ThreadPoolExecutor splits;
   private final ThreadPoolExecutor mergePool;
 
-  private volatile CompactionThroughputController compactionThroughputController;
+  private volatile ThroughputController compactionThroughputController;
 
   /**
    * Splitting should not take place if the total number of regions exceed this.
@@ -672,7 +672,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
       }
     }
 
-    CompactionThroughputController old = this.compactionThroughputController;
+    ThroughputController old = this.compactionThroughputController;
     if (old != null) {
       old.stop("configuration change");
     }
@@ -717,7 +717,7 @@ public class CompactSplitThread implements CompactionRequestor, PropagatingConfi
   }
 
   @VisibleForTesting
-  public CompactionThroughputController getCompactionThroughputController() {
+  public ThroughputController getCompactionThroughputController() {
     return compactionThroughputController;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
index 0f7e42a..4ad82ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
 import org.apache.hadoop.hbase.mapreduce.JobUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -164,7 +164,7 @@ public class CompactionTool extends Configured implements Tool {
         CompactionContext compaction = store.requestCompaction(Store.PRIORITY_USER, null);
         if (compaction == null) break;
         List<StoreFile> storeFiles =
-            store.compact(compaction, NoLimitCompactionThroughputController.INSTANCE);
+            store.compact(compaction, NoLimitThroughputController.INSTANCE);
         if (storeFiles != null && !storeFiles.isEmpty()) {
           if (keepCompactedFiles && deleteCompacted) {
             for (StoreFile storeFile: storeFiles) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
index da326e3..e73a456 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
@@ -21,16 +21,16 @@ package org.apache.hadoop.hbase.regionserver;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.compactions.ExploringCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
-import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
@@ -120,13 +120,13 @@ public class DefaultStoreEngine extends StoreEngine<
     }
 
     @Override
-    public List<Path> compact(CompactionThroughputController throughputController)
+    public List<Path> compact(ThroughputController throughputController)
         throws IOException {
       return compact(throughputController, null);
     }
 
     @Override
-    public List<Path> compact(CompactionThroughputController throughputController, User user)
+    public List<Path> compact(ThroughputController throughputController, User user)
         throws IOException {
       return compactor.compact(request, throughputController, user);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
index 2f51277..711b987 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -44,7 +45,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
 
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
-      MonitoredTask status) throws IOException {
+      MonitoredTask status, ThroughputController throughputController) throws IOException {
     ArrayList<Path> result = new ArrayList<Path>();
     int cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -71,7 +72,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
         writer.setTimeRangeTracker(snapshot.getTimeRangeTracker());
         IOException e = null;
         try {
-          performFlush(scanner, writer, smallestReadPoint);
+          performFlush(scanner, writer, smallestReadPoint, throughputController);
         } catch (IOException ioe) {
           e = ioe;
           // throw the exception out

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
index 496c7e2..d666db5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HMobStore.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobStoreEngine;
 import org.apache.hadoop.hbase.mob.MobUtils;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HFileArchiveUtil;
@@ -458,7 +458,7 @@ public class HMobStore extends HStore {
    */
   @Override
   public List<StoreFile> compact(CompactionContext compaction,
-      CompactionThroughputController throughputController) throws IOException {
+      ThroughputController throughputController) throws IOException {
     // If it's major compaction, try to find whether there's a sweeper is running
     // If yes, mark the major compaction as retainDeleteMarkers
     if (compaction.getRequest().isAllFiles()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 34a37f1..05ef2ad 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -151,9 +151,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputControllerFactory;
-import org.apache.hadoop.hbase.regionserver.compactions.NoLimitCompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.ReplayHLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -1727,12 +1727,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     for (Store s : getStores()) {
       CompactionContext compaction = s.requestCompaction();
       if (compaction != null) {
-        CompactionThroughputController controller = null;
+        ThroughputController controller = null;
         if (rsServices != null) {
           controller = CompactionThroughputControllerFactory.create(rsServices, conf);
         }
         if (controller == null) {
-          controller = NoLimitCompactionThroughputController.INSTANCE;
+          controller = NoLimitThroughputController.INSTANCE;
         }
         compact(compaction, s, controller, null);
       }
@@ -1749,7 +1749,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     for (Store s : getStores()) {
       CompactionContext compaction = s.requestCompaction();
       if (compaction != null) {
-        compact(compaction, s, NoLimitCompactionThroughputController.INSTANCE, null);
+        compact(compaction, s, NoLimitThroughputController.INSTANCE, null);
       }
     }
   }
@@ -1761,7 +1761,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @throws IOException e
    */
   @VisibleForTesting
-  void compactStore(byte[] family, CompactionThroughputController throughputController)
+  void compactStore(byte[] family, ThroughputController throughputController)
       throws IOException {
     Store s = getStore(family);
     CompactionContext compaction = s.requestCompaction();
@@ -1786,12 +1786,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @return whether the compaction completed
    */
   public boolean compact(CompactionContext compaction, Store store,
-      CompactionThroughputController throughputController) throws IOException {
+      ThroughputController throughputController) throws IOException {
     return compact(compaction, store, throughputController, null);
   }
 
   public boolean compact(CompactionContext compaction, Store store,
-      CompactionThroughputController throughputController, User user) throws IOException {
+      ThroughputController throughputController, User user) throws IOException {
     assert compaction != null && compaction.hasSelection();
     assert !compaction.getRequest().getFiles().isEmpty();
     if (this.closing.get() || this.closed.get()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b2cc78a..1183f96 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -83,6 +83,7 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionUtils;
 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager;
 import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
@@ -139,6 +140,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
+import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -189,6 +192,7 @@ import com.google.protobuf.RpcCallback;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.Service;
 import com.google.protobuf.ServiceException;
+
 import sun.misc.Signal;
 import sun.misc.SignalHandler;
 
@@ -198,7 +202,8 @@ import sun.misc.SignalHandler;
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
 @SuppressWarnings("deprecation")
-public class HRegionServer extends HasThread implements RegionServerServices, LastSequenceId {
+public class HRegionServer extends HasThread implements
+    RegionServerServices, LastSequenceId, ConfigurationObserver {
 
   private static final Log LOG = LogFactory.getLog(HRegionServer.class);
 
@@ -487,6 +492,8 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
 
   private CompactedHFilesDischarger compactedFileDischarger;
 
+  private volatile ThroughputController flushThroughputController;
+
   /**
    * Starts a HRegionServer at the default location.
    * @param conf
@@ -609,6 +616,7 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
     putUpWebUI();
     this.walRoller = new LogRoller(this, this);
     this.choreService = new ChoreService(getServerName().toString(), true);
+    this.flushThroughputController = FlushThroughputControllerFactory.create(this, conf);
 
     if (!SystemUtils.IS_OS_WINDOWS) {
       Signal.handle(new Signal("HUP"), new SignalHandler() {
@@ -899,6 +907,7 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
     // Registering the compactSplitThread object with the ConfigurationManager.
     configurationManager.registerObserver(this.compactSplitThread);
     configurationManager.registerObserver(this.rpcServices);
+    configurationManager.registerObserver(this);
   }
 
   /**
@@ -3380,4 +3389,28 @@ public class HRegionServer extends HasThread implements RegionServerServices, La
   public boolean walRollRequestFinished() {
     return this.walRoller.walRollFinished();
   }
+
+  @Override
+  public ThroughputController getFlushThroughputController() {
+    return flushThroughputController;
+  }
+
+  @Override
+  public double getFlushPressure() {
+    if (getRegionServerAccounting() == null || cacheFlusher == null) {
+      // return 0 during RS initialization
+      return 0.0;
+    }
+    return getRegionServerAccounting().getGlobalMemstoreSize() * 1.0
+        / cacheFlusher.globalMemStoreLimitLowMark;
+  }
+
+  @Override
+  public void onConfigurationChange(Configuration newConf) {
+    ThroughputController old = this.flushThroughputController;
+    if (old != null) {
+      old.stop("configuration change");
+    }
+    this.flushThroughputController = FlushThroughputControllerFactory.create(this, newConf);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 9ebdaee..7f90e17 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -77,9 +77,9 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
 import org.apache.hadoop.hbase.security.User;
@@ -845,7 +845,7 @@ public class HStore implements Store {
 
   /**
    * Snapshot this stores memstore. Call before running
-   * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask)}
+   * {@link #flushCache(long, MemStoreSnapshot, MonitoredTask, ThroughputController)}
    *  so it has some work to do.
    */
   void snapshot() {
@@ -858,15 +858,16 @@ public class HStore implements Store {
   }
 
   /**
-   * Write out current snapshot.  Presumes {@link #snapshot()} has been called previously.
+   * Write out current snapshot. Presumes {@link #snapshot()} has been called previously.
    * @param logCacheFlushId flush sequence number
    * @param snapshot
    * @param status
+   * @param throughputController
    * @return The path name of the tmp file to which the store was flushed
-   * @throws IOException
+   * @throws IOException if exception occurs during process
    */
   protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
-      MonitoredTask status) throws IOException {
+      MonitoredTask status, ThroughputController throughputController) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
@@ -876,7 +877,8 @@ public class HStore implements Store {
     IOException lastException = null;
     for (int i = 0; i < flushRetriesNumber; i++) {
       try {
-        List<Path> pathNames = flusher.flushSnapshot(snapshot, logCacheFlushId, status);
+        List<Path> pathNames =
+            flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController);
         Path lastPathName = null;
         try {
           for (Path pathName : pathNames) {
@@ -1175,13 +1177,13 @@ public class HStore implements Store {
    */
   @Override
   public List<StoreFile> compact(CompactionContext compaction,
-      CompactionThroughputController throughputController) throws IOException {
+      ThroughputController throughputController) throws IOException {
     return compact(compaction, throughputController, null);
   }
 
   @Override
   public List<StoreFile> compact(CompactionContext compaction,
-    CompactionThroughputController throughputController, User user) throws IOException {
+    ThroughputController throughputController, User user) throws IOException {
     assert compaction != null;
     List<StoreFile> sfs = null;
     CompactionRequest cr = compaction.getRequest();
@@ -2058,7 +2060,10 @@ public class HStore implements Store {
 
     @Override
     public void flushCache(MonitoredTask status) throws IOException {
-      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status);
+      RegionServerServices rsService = region.getRegionServerServices();
+      ThroughputController throughputController =
+          rsService == null ? null : rsService.getFlushThroughputController();
+      tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index cd4816c..2a71629 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.zookeeper.KeeperException;
 
@@ -231,4 +232,16 @@ public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegi
    * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
    */
   double getCompactionPressure();
+
+  /**
+   * @return the controller to avoid flush too fast
+   */
+  ThroughputController getFlushThroughputController();
+
+  /**
+   * @return the flush pressure of all stores on this regionserver. The value should be greater than
+   *         or equal to 0.0, and any value greater than 1.0 means we enter the emergency state that
+   *         global memstore size already exceeds lower limit.
+   */
+  double getFlushPressure();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
index 8bb10f0..1db213c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
@@ -22,8 +22,6 @@ import java.util.Collection;
 import java.util.List;
 import java.util.NavigableSet;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
@@ -32,6 +30,8 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.conf.PropagatingConfigurationObserver;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 
 /**
@@ -225,14 +225,14 @@ public interface Store extends HeapSize, StoreConfigInformation, PropagatingConf
   void cancelRequestedCompaction(CompactionContext compaction);
 
   /**
-   * @deprecated see compact(CompactionContext, CompactionThroughputController, User)
+   * @deprecated see compact(CompactionContext, ThroughputController, User)
    */
   @Deprecated
   List<StoreFile> compact(CompactionContext compaction,
-      CompactionThroughputController throughputController) throws IOException;
+      ThroughputController throughputController) throws IOException;
 
   List<StoreFile> compact(CompactionContext compaction,
-    CompactionThroughputController throughputController, User user) throws IOException;
+    ThroughputController throughputController, User user) throws IOException;
 
   /**
    * @return true if we should run a major compaction.

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
index bcc0a90..9b182a2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -27,10 +28,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 
 /**
  * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@@ -51,10 +55,11 @@ abstract class StoreFlusher {
    * @param snapshot Memstore snapshot.
    * @param cacheFlushSeqNum Log cache flush sequence number.
    * @param status Task that represents the flush operation and may be updated with status.
+   * @param throughputController A controller to avoid flush too fast
    * @return List of files written. Can be empty; must not be null.
    */
   public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
-      MonitoredTask status) throws IOException;
+      MonitoredTask status, ThroughputController throughputController) throws IOException;
 
   protected void finalizeWriter(StoreFile.Writer writer, long cacheFlushSeqNum,
       MonitoredTask status) throws IOException {
@@ -104,9 +109,10 @@ abstract class StoreFlusher {
    * @param scanner Scanner to get data from.
    * @param sink Sink to write data to. Could be StoreFile.Writer.
    * @param smallestReadPoint Smallest read point used for the flush.
+   * @param throughputController A controller to avoid flush too fast
    */
-  protected void performFlush(InternalScanner scanner,
-      Compactor.CellSink sink, long smallestReadPoint) throws IOException {
+  protected void performFlush(InternalScanner scanner, Compactor.CellSink sink,
+      long smallestReadPoint, ThroughputController throughputController) throws IOException {
     int compactionKVMax =
       conf.getInt(HConstants.COMPACTION_KV_MAX, HConstants.COMPACTION_KV_MAX_DEFAULT);
 
@@ -115,17 +121,36 @@ abstract class StoreFlusher {
 
     List<Cell> kvs = new ArrayList<Cell>();
     boolean hasMore;
-    do {
-      hasMore = scanner.next(kvs, scannerContext);
-      if (!kvs.isEmpty()) {
-        for (Cell c : kvs) {
-          // If we know that this KV is going to be included always, then let us
-          // set its memstoreTS to 0. This will help us save space when writing to
-          // disk.
-          sink.append(c);
+    String flushName = ThroughputControlUtil.getNameForThrottling(store, "flush");
+    // no control on system table (such as meta, namespace, etc) flush
+    boolean control = throughputController != null && !store.getRegionInfo().isSystemTable();
+    if (control) {
+      throughputController.start(flushName);
+    }
+    try {
+      do {
+        hasMore = scanner.next(kvs, scannerContext);
+        if (!kvs.isEmpty()) {
+          for (Cell c : kvs) {
+            // If we know that this KV is going to be included always, then let us
+            // set its memstoreTS to 0. This will help us save space when writing to
+            // disk.
+            sink.append(c);
+            int len = KeyValueUtil.length(c);
+            if (control) {
+              throughputController.control(flushName, len);
+            }
+          }
+          kvs.clear();
         }
-        kvs.clear();
+      } while (hasMore);
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException("Interrupted while control throughput of flushing "
+          + flushName);
+    } finally {
+      if (control) {
+        throughputController.finish(flushName);
       }
-    } while (hasMore);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
index 3707290..1e16ca8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
@@ -23,16 +23,16 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 
 import com.google.common.base.Preconditions;
@@ -100,14 +100,14 @@ public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
     }
 
     @Override
-    public List<Path> compact(CompactionThroughputController throughputController)
+    public List<Path> compact(ThroughputController throughputController)
         throws IOException {
       Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
       return this.stripeRequest.execute(compactor, throughputController, null);
     }
 
     @Override
-    public List<Path> compact(CompactionThroughputController throughputController, User user)
+    public List<Path> compact(ThroughputController throughputController, User user)
         throws IOException {
       Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
       return this.stripeRequest.execute(compactor, throughputController, user);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
index 9deea7a..9a06a88 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -56,7 +57,7 @@ public class StripeStoreFlusher extends StoreFlusher {
 
   @Override
   public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
-      MonitoredTask status) throws IOException {
+      MonitoredTask status, ThroughputController throughputController) throws IOException {
     List<Path> result = new ArrayList<Path>();
     int cellsCount = snapshot.getCellsCount();
     if (cellsCount == 0) return result; // don't flush if there are no entries
@@ -80,7 +81,7 @@ public class StripeStoreFlusher extends StoreFlusher {
       mw.init(storeScanner, factory, store.getComparator());
 
       synchronized (flushLock) {
-        performFlush(scanner, mw, smallestReadPoint);
+        performFlush(scanner, mw, smallestReadPoint, throughputController);
         result = mw.commitWriters(cacheFlushSeqNum, false);
         success = true;
       }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java
index cb16966..6902c40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 
 
@@ -69,10 +70,10 @@ public abstract class CompactionContext {
    * Runs the compaction based on current selection. select/forceSelect must have been called.
    * @return The new file paths resulting from compaction.
    */
-  public abstract List<Path> compact(CompactionThroughputController throughputController)
+  public abstract List<Path> compact(ThroughputController throughputController)
       throws IOException;
 
-  public abstract List<Path> compact(CompactionThroughputController throughputController, User user)
+  public abstract List<Path> compact(ThroughputController throughputController, User user)
       throws IOException;
 
   public CompactionRequest getRequest() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java
deleted file mode 100644
index 657ecb4..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputController.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.compactions;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-
-/**
- * A utility that constrains the total throughput of one or more simultaneous flows (compactions) by
- * sleeping when necessary.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public interface CompactionThroughputController extends Stoppable {
-
-  /**
-   * Setup controller for the given region server.
-   */
-  void setup(RegionServerServices server);
-
-  /**
-   * Start a compaction.
-   */
-  void start(String compactionName);
-
-  /**
-   * Control the compaction throughput. Will sleep if too fast.
-   * @return the actual sleep time.
-   */
-  long control(String compactionName, long size) throws InterruptedException;
-
-  /**
-   * Finish a compaction. Should call this method in a finally block.
-   */
-  void finish(String compactionName);
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java
deleted file mode 100644
index 132c27f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionThroughputControllerFactory.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.compactions;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.util.ReflectionUtils;
-
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class CompactionThroughputControllerFactory {
-
-  private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class);
-
-  public static final String HBASE_THROUGHPUT_CONTROLLER_KEY =
-      "hbase.regionserver.throughput.controller";
-
-  private static final Class<? extends CompactionThroughputController>
-      DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class;
-
-  public static CompactionThroughputController create(RegionServerServices server,
-      Configuration conf) {
-    Class<? extends CompactionThroughputController> clazz = getThroughputControllerClass(conf);
-    CompactionThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
-    controller.setup(server);
-    return controller;
-  }
-
-  public static Class<? extends CompactionThroughputController> getThroughputControllerClass(
-      Configuration conf) {
-    String className =
-        conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName());
-    try {
-      return Class.forName(className).asSubclass(CompactionThroughputController.class);
-    } catch (Exception e) {
-      LOG.warn(
-        "Unable to load configured throughput controller '" + className
-            + "', load default throughput controller "
-            + DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e);
-      return DEFAULT_THROUGHPUT_CONTROLLER_CLASS;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
index 78c8d0f..0e6ab05 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -48,10 +47,12 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 
 /**
@@ -282,25 +283,6 @@ public abstract class Compactor {
   }
 
   /**
-   * Used to prevent compaction name conflict when multiple compactions running parallel on the
-   * same store.
-   */
-  private static final AtomicInteger NAME_COUNTER = new AtomicInteger(0);
-
-  private String generateCompactionName() {
-    int counter;
-    for (;;) {
-      counter = NAME_COUNTER.get();
-      int next = counter == Integer.MAX_VALUE ? 0 : counter + 1;
-      if (NAME_COUNTER.compareAndSet(counter, next)) {
-        break;
-      }
-    }
-    return store.getRegionInfo().getRegionNameAsString() + "#"
-        + store.getFamily().getNameAsString() + "#" + counter;
-  }
-
-  /**
    * Performs the compaction.
    * @param fd FileDetails of cell sink writer
    * @param scanner Where to read from.
@@ -312,7 +294,7 @@ public abstract class Compactor {
    */
   protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
       long smallestReadPoint, boolean cleanSeqId,
-      CompactionThroughputController throughputController, boolean major) throws IOException {
+      ThroughputController throughputController, boolean major) throws IOException {
     long bytesWrittenProgressForCloseCheck = 0;
     long bytesWrittenProgressForLog = 0;
     long bytesWrittenProgressForShippedCall = 0;
@@ -324,7 +306,7 @@ public abstract class Compactor {
     if (LOG.isDebugEnabled()) {
       lastMillis = EnvironmentEdgeManager.currentTime();
     }
-    String compactionName = generateCompactionName();
+    String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
     long now = 0;
     boolean hasMore;
     ScannerContext scannerContext =

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
index 069d221..e7e0cca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
@@ -34,11 +34,13 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 
 /**
  * Compact passed set of files. Create an instance and then call
- * {@link #compact(CompactionRequest, CompactionThroughputController, User)}
+ * {@link #compact(CompactionRequest, ThroughputController, User)}
  */
 @InterfaceAudience.Private
 public class DefaultCompactor extends Compactor {
@@ -52,7 +54,7 @@ public class DefaultCompactor extends Compactor {
    * Do a minor/major compaction on an explicit set of storefiles from a Store.
    */
   public List<Path> compact(final CompactionRequest request,
-      CompactionThroughputController throughputController, User user) throws IOException {
+      ThroughputController throughputController, User user) throws IOException {
     FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
     this.progress = new CompactionProgress(fd.maxKeyCount);
 
@@ -173,7 +175,7 @@ public class DefaultCompactor extends Compactor {
 
   /**
    * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
-   * {@link #compact(CompactionRequest, CompactionThroughputController, User)};
+   * {@link #compact(CompactionRequest, ThroughputController, User)};
    * @param filesToCompact the files to compact. These are used as the compactionSelection for
    *          the generated {@link CompactionRequest}.
    * @param isMajor true to major compact (prune all deletes, max versions, etc)
@@ -185,6 +187,6 @@ public class DefaultCompactor extends Compactor {
       throws IOException {
     CompactionRequest cr = new CompactionRequest(filesToCompact);
     cr.setIsMajor(isMajor, isMajor);
-    return this.compact(cr, NoLimitCompactionThroughputController.INSTANCE, null);
+    return this.compact(cr, NoLimitThroughputController.INSTANCE, null);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java
deleted file mode 100644
index 766b2cb..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/NoLimitCompactionThroughputController.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.compactions;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-
-/**
- * A dummy CompactionThroughputController that does nothing.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class NoLimitCompactionThroughputController implements CompactionThroughputController {
-
-  public static final NoLimitCompactionThroughputController INSTANCE =
-      new NoLimitCompactionThroughputController();
-
-  @Override
-  public void setup(RegionServerServices server) {
-  }
-
-  @Override
-  public void start(String compactionName) {
-  }
-
-  @Override
-  public long control(String compactionName, long size) throws InterruptedException {
-    return 0;
-  }
-
-  @Override
-  public void finish(String compactionName) {
-  }
-
-  private volatile boolean stopped;
-
-  @Override
-  public void stop(String why) {
-    stopped = true;
-  }
-
-  @Override
-  public boolean isStopped() {
-    return stopped;
-  }
-
-  @Override
-  public String toString() {
-    return "NoLimitCompactionThroughputController";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java
deleted file mode 100644
index 11ab568..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/PressureAwareCompactionThroughputController.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.regionserver.compactions;
-
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.ScheduledChore;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.RegionServerServices;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-
-/**
- * A throughput controller which uses the follow schema to limit throughput
- * <ul>
- * <li>If compaction pressure is greater than 1.0, no limitation.</li>
- * <li>In off peak hours, use a fixed throughput limitation
- * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK}</li>
- * <li>In normal hours, the max throughput is tune between
- * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND} and
- * {@value #HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND}, using the formula &quot;lower +
- * (higer - lower) * compactionPressure&quot;, where compactionPressure is in range [0.0, 1.0]</li>
- * </ul>
- * @see org.apache.hadoop.hbase.regionserver.Store#getCompactionPressure()
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
-public class PressureAwareCompactionThroughputController extends Configured implements
-    CompactionThroughputController, Stoppable {
-
-  private final static Log LOG = LogFactory
-      .getLog(PressureAwareCompactionThroughputController.class);
-
-  public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
-      "hbase.hstore.compaction.throughput.higher.bound";
-
-  private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND =
-      20L * 1024 * 1024;
-
-  public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
-      "hbase.hstore.compaction.throughput.lower.bound";
-
-  private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND =
-      10L * 1024 * 1024;
-
-  public static final String HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK =
-      "hbase.hstore.compaction.throughput.offpeak";
-
-  private static final long DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK = Long.MAX_VALUE;
-
-  public static final String HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD =
-      "hbase.hstore.compaction.throughput.tune.period";
-
-  private static final int DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD = 60 * 1000;
-
-  /**
-   * Stores the information of one controlled compaction.
-   */
-  private static final class ActiveCompaction {
-
-    private final long startTime;
-
-    private long lastControlTime;
-
-    private long lastControlSize;
-
-    private long totalSize;
-
-    private long numberOfSleeps;
-
-    private long totalSleepTime;
-
-    // prevent too many debug log
-    private long lastLogTime;
-
-    ActiveCompaction() {
-      long currentTime = EnvironmentEdgeManager.currentTime();
-      this.startTime = currentTime;
-      this.lastControlTime = currentTime;
-      this.lastLogTime = currentTime;
-    }
-  }
-
-  private long maxThroughputHigherBound;
-
-  private long maxThroughputLowerBound;
-
-  private long maxThroughputOffpeak;
-
-  private OffPeakHours offPeakHours;
-
-  private long controlPerSize;
-
-  private int tuningPeriod;
-
-  volatile double maxThroughput;
-
-  private final ConcurrentMap<String, ActiveCompaction> activeCompactions =
-      new ConcurrentHashMap<String, ActiveCompaction>();
-
-  @Override
-  public void setup(final RegionServerServices server) {
-    server.getChoreService().scheduleChore(
-      new ScheduledChore("CompactionThroughputTuner", this, tuningPeriod) {
-
-        @Override
-        protected void chore() {
-          tune(server.getCompactionPressure());
-        }
-      });
-  }
-
-  private void tune(double compactionPressure) {
-    double maxThroughputToSet;
-    if (compactionPressure > 1.0) {
-      // set to unlimited if some stores already reach the blocking store file count
-      maxThroughputToSet = Double.MAX_VALUE;
-    } else if (offPeakHours.isOffPeakHour()) {
-      maxThroughputToSet = maxThroughputOffpeak;
-    } else {
-      // compactionPressure is between 0.0 and 1.0, we use a simple linear formula to
-      // calculate the throughput limitation.
-      maxThroughputToSet =
-          maxThroughputLowerBound + (maxThroughputHigherBound - maxThroughputLowerBound)
-              * compactionPressure;
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("compactionPressure is " + compactionPressure + ", tune compaction throughput to "
-          + throughputDesc(maxThroughputToSet));
-    }
-    this.maxThroughput = maxThroughputToSet;
-  }
-
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConf(conf);
-    if (conf == null) {
-      return;
-    }
-    this.maxThroughputHigherBound =
-        conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND,
-          DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_HIGHER_BOUND);
-    this.maxThroughputLowerBound =
-        conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND,
-          DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_LOWER_BOUND);
-    this.maxThroughputOffpeak =
-        conf.getLong(HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK,
-          DEFAULT_HBASE_HSTORE_COMPACTION_MAX_THROUGHPUT_OFFPEAK);
-    this.offPeakHours = OffPeakHours.getInstance(conf);
-    this.controlPerSize = this.maxThroughputLowerBound;
-    this.maxThroughput = this.maxThroughputLowerBound;
-    this.tuningPeriod =
-        getConf().getInt(HBASE_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD,
-          DEFAULT_HSTORE_COMPACTION_THROUGHPUT_TUNE_PERIOD);
-    LOG.info("Compaction throughput configurations, higher bound: "
-        + throughputDesc(maxThroughputHigherBound) + ", lower bound "
-        + throughputDesc(maxThroughputLowerBound) + ", off peak: "
-        + throughputDesc(maxThroughputOffpeak) + ", tuning period: " + tuningPeriod + " ms");
-  }
-
-  private String throughputDesc(long deltaSize, long elapsedTime) {
-    return throughputDesc((double) deltaSize / elapsedTime * 1000);
-  }
-
-  private String throughputDesc(double speed) {
-    if (speed >= 1E15) { // large enough to say it is unlimited
-      return "unlimited";
-    } else {
-      return String.format("%.2f MB/sec", speed / 1024 / 1024);
-    }
-  }
-
-  @Override
-  public void start(String compactionName) {
-    activeCompactions.put(compactionName, new ActiveCompaction());
-  }
-
-  @Override
-  public long control(String compactionName, long size) throws InterruptedException {
-    ActiveCompaction compaction = activeCompactions.get(compactionName);
-    compaction.totalSize += size;
-    long deltaSize = compaction.totalSize - compaction.lastControlSize;
-    if (deltaSize < controlPerSize) {
-      return 0;
-    }
-    long now = EnvironmentEdgeManager.currentTime();
-    double maxThroughputPerCompaction = this.maxThroughput / activeCompactions.size();
-    long minTimeAllowed = (long) (deltaSize / maxThroughputPerCompaction * 1000); // ms
-    long elapsedTime = now - compaction.lastControlTime;
-    compaction.lastControlSize = compaction.totalSize;
-    if (elapsedTime >= minTimeAllowed) {
-      compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
-      return 0;
-    }
-    // too fast
-    long sleepTime = minTimeAllowed - elapsedTime;
-    if (LOG.isDebugEnabled()) {
-      // do not log too much
-      if (now - compaction.lastLogTime > 60L * 1000) {
-        LOG.debug(compactionName + " sleep " + sleepTime + " ms because current throughput is "
-            + throughputDesc(deltaSize, elapsedTime) + ", max allowed is "
-            + throughputDesc(maxThroughputPerCompaction) + ", already slept "
-            + compaction.numberOfSleeps + " time(s) and total slept time is "
-            + compaction.totalSleepTime + " ms till now.");
-        compaction.lastLogTime = now;
-      }
-    }
-    Thread.sleep(sleepTime);
-    compaction.numberOfSleeps++;
-    compaction.totalSleepTime += sleepTime;
-    compaction.lastControlTime = EnvironmentEdgeManager.currentTime();
-    return sleepTime;
-  }
-
-  @Override
-  public void finish(String compactionName) {
-    ActiveCompaction compaction = activeCompactions.remove(compactionName);
-    long elapsedTime = Math.max(1, EnvironmentEdgeManager.currentTime() - compaction.startTime);
-    LOG.info(compactionName + " average throughput is "
-        + throughputDesc(compaction.totalSize, elapsedTime) + ", slept "
-        + compaction.numberOfSleeps + " time(s) and total slept time is "
-        + compaction.totalSleepTime + " ms. " + activeCompactions.size()
-        + " active compactions remaining, total limit is " + throughputDesc(maxThroughput));
-  }
-
-  private volatile boolean stopped = false;
-
-  @Override
-  public void stop(String why) {
-    stopped = true;
-  }
-
-  @Override
-  public boolean isStopped() {
-    return stopped;
-  }
-
-  @Override
-  public String toString() {
-    return "DefaultCompactionThroughputController [maxThroughput=" + throughputDesc(maxThroughput)
-        + ", activeCompactions=" + activeCompactions.size() + "]";
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
index 5f024b8..2bb8fc8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
 import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ConcatenatedLists;
@@ -392,7 +393,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
     protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
 
     public List<Path> execute(StripeCompactor compactor,
-      CompactionThroughputController throughputController) throws IOException {
+      ThroughputController throughputController) throws IOException {
       return execute(compactor, throughputController, null);
     }
     /**
@@ -402,7 +403,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
      * @return result of compact(...)
      */
     public abstract List<Path> execute(StripeCompactor compactor,
-        CompactionThroughputController throughputController, User user) throws IOException;
+        ThroughputController throughputController, User user) throws IOException;
 
     public StripeCompactionRequest(CompactionRequest request) {
       this.request = request;
@@ -454,7 +455,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
 
     @Override
     public List<Path> execute(StripeCompactor compactor,
-        CompactionThroughputController throughputController, User user) throws IOException {
+        ThroughputController throughputController, User user) throws IOException {
       return compactor.compact(this.request, this.targetBoundaries, this.majorRangeFromRow,
         this.majorRangeToRow, throughputController, user);
     }
@@ -505,7 +506,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
 
     @Override
     public List<Path> execute(StripeCompactor compactor,
-        CompactionThroughputController throughputController, User user) throws IOException {
+        ThroughputController throughputController, User user) throws IOException {
       return compactor.compact(this.request, this.targetCount, this.targetKvs, this.startRow,
         this.endRow, this.majorRangeFromRow, this.majorRangeToRow, throughputController, user);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
index 021965c..fd0e2b2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
 import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -53,14 +54,14 @@ public class StripeCompactor extends Compactor {
 
   public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
     byte[] majorRangeFromRow, byte[] majorRangeToRow,
-    CompactionThroughputController throughputController) throws IOException {
+    ThroughputController throughputController) throws IOException {
     return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
       throughputController, null);
   }
 
   public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
       byte[] majorRangeFromRow, byte[] majorRangeToRow,
-      CompactionThroughputController throughputController, User user) throws IOException {
+      ThroughputController throughputController, User user) throws IOException {
     if (LOG.isDebugEnabled()) {
       StringBuilder sb = new StringBuilder();
       sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@@ -77,14 +78,14 @@ public class StripeCompactor extends Compactor {
 
   public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
     byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
-    CompactionThroughputController throughputController) throws IOException {
+    ThroughputController throughputController) throws IOException {
     return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
       majorRangeToRow, throughputController, null);
   }
 
   public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
       byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
-      CompactionThroughputController throughputController, User user) throws IOException {
+      ThroughputController throughputController, User user) throws IOException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Executing compaction with " + targetSize
           + " target file size, no more than " + targetCount + " files, in ["
@@ -98,7 +99,7 @@ public class StripeCompactor extends Compactor {
 
   private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
       byte[] majorRangeFromRow, byte[] majorRangeToRow,
-      CompactionThroughputController throughputController, User user) throws IOException {
+      ThroughputController throughputController, User user) throws IOException {
     final Collection<StoreFile> filesToCompact = request.getFiles();
     final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
     this.progress = new CompactionProgress(fd.maxKeyCount);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
new file mode 100644
index 0000000..4b1bb00
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/CompactionThroughputControllerFactory.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.util.ReflectionUtils;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public final class CompactionThroughputControllerFactory {
+
+  private static final Log LOG = LogFactory.getLog(CompactionThroughputControllerFactory.class);
+
+  public static final String HBASE_THROUGHPUT_CONTROLLER_KEY =
+      "hbase.regionserver.throughput.controller";
+
+  private CompactionThroughputControllerFactory() {
+  }
+
+  private static final Class<? extends ThroughputController>
+      DEFAULT_THROUGHPUT_CONTROLLER_CLASS = PressureAwareCompactionThroughputController.class;
+
+  // for backward compatibility and may not be supported in the future
+  private static final String DEPRECATED_NAME_OF_PRESSURE_AWARE_THROUGHPUT_CONTROLLER_CLASS =
+      "org.apache.hadoop.hbase.regionserver.compactions."
+          + "PressureAwareCompactionThroughputController";
+  private static final String DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS =
+      "org.apache.hadoop.hbase.regionserver.compactions."
+          + "NoLimitThroughputController.java";
+
+  public static ThroughputController create(RegionServerServices server,
+      Configuration conf) {
+    Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf);
+    ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
+    controller.setup(server);
+    return controller;
+  }
+
+  public static Class<? extends ThroughputController> getThroughputControllerClass(
+      Configuration conf) {
+    String className =
+        conf.get(HBASE_THROUGHPUT_CONTROLLER_KEY, DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName());
+    className = resolveDeprecatedClassName(className);
+    try {
+      return Class.forName(className).asSubclass(ThroughputController.class);
+    } catch (Exception e) {
+      LOG.warn(
+        "Unable to load configured throughput controller '" + className
+            + "', load default throughput controller "
+            + DEFAULT_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e);
+      return DEFAULT_THROUGHPUT_CONTROLLER_CLASS;
+    }
+  }
+
+  /**
+   * Resolve deprecated class name to keep backward compatibiliy
+   * @param oldName old name of the class
+   * @return the new name if there is any
+   */
+  private static String resolveDeprecatedClassName(String oldName) {
+    String className = oldName;
+    if (className.equals(DEPRECATED_NAME_OF_PRESSURE_AWARE_THROUGHPUT_CONTROLLER_CLASS)) {
+      className = PressureAwareCompactionThroughputController.class.getName();
+    } else if (className.equals(DEPRECATED_NAME_OF_NO_LIMIT_THROUGHPUT_CONTROLLER_CLASS)) {
+      className = NoLimitThroughputController.class.getName();
+    }
+    if (!className.equals(oldName)) {
+      LOG.warn(oldName + " is deprecated, please use " + className + " instead");
+    }
+    return className;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/FlushThroughputControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/FlushThroughputControllerFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/FlushThroughputControllerFactory.java
new file mode 100644
index 0000000..fa4c9aa
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/FlushThroughputControllerFactory.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.util.ReflectionUtils;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public final class FlushThroughputControllerFactory {
+
+  private static final Log LOG = LogFactory.getLog(FlushThroughputControllerFactory.class);
+
+  public static final String HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY =
+      "hbase.regionserver.flush.throughput.controller";
+
+  private static final Class<? extends ThroughputController>
+      DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS = NoLimitThroughputController.class;
+
+  private FlushThroughputControllerFactory() {
+  }
+
+  public static ThroughputController create(RegionServerServices server,
+      Configuration conf) {
+    Class<? extends ThroughputController> clazz = getThroughputControllerClass(conf);
+    ThroughputController controller = ReflectionUtils.newInstance(clazz, conf);
+    controller.setup(server);
+    return controller;
+  }
+
+  public static Class<? extends ThroughputController> getThroughputControllerClass(
+      Configuration conf) {
+    String className =
+        conf.get(HBASE_FLUSH_THROUGHPUT_CONTROLLER_KEY,
+          DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName());
+    try {
+      return Class.forName(className).asSubclass(ThroughputController.class);
+    } catch (Exception e) {
+      LOG.warn(
+        "Unable to load configured flush throughput controller '" + className
+            + "', load default throughput controller "
+            + DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS.getName() + " instead", e);
+      return DEFAULT_FLUSH_THROUGHPUT_CONTROLLER_CLASS;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3b1ce99/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
new file mode 100644
index 0000000..a6a8eb9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/throttle/NoLimitThroughputController.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.throttle;
+
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class NoLimitThroughputController implements ThroughputController {
+
+  public static final NoLimitThroughputController INSTANCE = new NoLimitThroughputController();
+
+  @Override
+  public void setup(RegionServerServices server) {
+  }
+
+  @Override
+  public void start(String compactionName) {
+  }
+
+  @Override
+  public long control(String compactionName, long size) throws InterruptedException {
+    return 0;
+  }
+
+  @Override
+  public void finish(String compactionName) {
+  }
+
+  private boolean stopped;
+
+  @Override
+  public void stop(String why) {
+    stopped = true;
+  }
+
+  @Override
+  public boolean isStopped() {
+    return stopped;
+  }
+
+  @Override
+  public String toString() {
+    return "NoLimitThroughputController";
+  }
+}


Mime
View raw message