hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ramkris...@apache.org
Subject hbase git commit: HBASE-13686 - Fail to limit rate in RateLimiter (Ashish Singhi)
Date Sun, 07 Jun 2015 16:56:15 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 deca3930b -> d34e9c5c5


HBASE-13686 - Fail to limit rate in RateLimiter (Ashish Singhi)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d34e9c5c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d34e9c5c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d34e9c5c

Branch: refs/heads/branch-1
Commit: d34e9c5c5ccc211999f057cdc50a6916d87b7b15
Parents: deca393
Author: ramkrishna <ramkrishna.s.vasudevan@gmail.com>
Authored: Sun Jun 7 22:25:32 2015 +0530
Committer: ramkrishna <ramkrishna.s.vasudevan@gmail.com>
Committed: Sun Jun 7 22:25:32 2015 +0530

----------------------------------------------------------------------
 .../hbase/quotas/ThrottlingException.java       |  16 ++-
 .../quotas/AverageIntervalRateLimiter.java      |  65 ++++++++++
 .../hbase/quotas/FixedIntervalRateLimiter.java  |  56 +++++++++
 .../apache/hadoop/hbase/quotas/RateLimiter.java | 124 ++++++++++++-------
 .../hadoop/hbase/quotas/TimeBasedLimiter.java   |  83 ++++++++-----
 .../hadoop/hbase/quotas/TestRateLimiter.java    |  96 +++++++++++---
 6 files changed, 341 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d34e9c5c/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java
index 547f902..e9fe172 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/quotas/ThrottlingException.java
@@ -30,13 +30,14 @@ public class ThrottlingException extends QuotaExceededException {
   @InterfaceAudience.Public
   @InterfaceStability.Evolving
   public enum Type {
-    NumRequestsExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded, WriteSizeExceeded,
-    ReadSizeExceeded,
+    NumRequestsExceeded, RequestSizeExceeded, NumReadRequestsExceeded, NumWriteRequestsExceeded,
+    WriteSizeExceeded, ReadSizeExceeded,
   }
 
-  private static final String[] MSG_TYPE = new String[] { "number of requests exceeded",
-      "number of read requests exceeded", "number of write requests exceeded",
-      "write size limit exceeded", "read size limit exceeded", };
+  private static final String[] MSG_TYPE =
+      new String[] { "number of requests exceeded", "request size limit exceeded",
+          "number of read requests exceeded", "number of write requests exceeded",
+          "write size limit exceeded", "read size limit exceeded", };
 
   private static final String MSG_WAIT = " - wait ";
 
@@ -76,6 +77,11 @@ public class ThrottlingException extends QuotaExceededException {
   public static void throwNumRequestsExceeded(final long waitInterval) throws ThrottlingException
{
     throwThrottlingException(Type.NumRequestsExceeded, waitInterval);
   }
+  
+  public static void throwRequestSizeExceeded(final long waitInterval)
+      throws ThrottlingException {
+    throwThrottlingException(Type.RequestSizeExceeded, waitInterval);
+  }
 
   public static void throwNumReadRequestsExceeded(final long waitInterval)
       throws ThrottlingException {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d34e9c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
new file mode 100644
index 0000000..a0cd71b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/AverageIntervalRateLimiter.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on
an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This limiter will refill resources at every TimeUnit/resources interval. For example:
For a
+ * limiter configured with 10resources/second, then 1 resource will be refilled after every
100ms
+ * (1sec/10resources)
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class AverageIntervalRateLimiter extends RateLimiter {
+  private long nextRefillTime = -1L;
+
+  @Override
+  public long refill(long limit, long available) {
+    final long now = EnvironmentEdgeManager.currentTime();
+    if (nextRefillTime == -1) {
+      // Till now no resource has been consumed.
+      nextRefillTime = EnvironmentEdgeManager.currentTime();
+      return limit;
+    }
+
+    long delta = (limit * (now - nextRefillTime)) / super.getTimeUnitInMillis();
+    if (delta > 0) {
+      this.nextRefillTime = now;
+      return Math.min(limit, available + delta);
+    }
+    return 0;
+  }
+
+  @Override
+  public long getWaitInterval(long limit, long available, long amount) {
+    if (nextRefillTime == -1) {
+      return 0;
+    }
+    long timeUnitInMillis = super.getTimeUnitInMillis();
+    return ((amount * timeUnitInMillis) / limit) - ((available * timeUnitInMillis) / limit);
+  }
+
+  // This method is for strictly testing purpose only
+  @VisibleForTesting
+  public void setNextRefillTime(long nextRefillTime) {
+    this.nextRefillTime = nextRefillTime;
+  }
+
+  @VisibleForTesting
+  public long getNextRefillTime() {
+    return this.nextRefillTime;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d34e9c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FixedIntervalRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FixedIntervalRateLimiter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FixedIntervalRateLimiter.java
new file mode 100644
index 0000000..0b05798
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/FixedIntervalRateLimiter.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information
regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain
a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on
an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.quotas;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * With this limiter resources will be refilled only after a fixed interval of time.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class FixedIntervalRateLimiter extends RateLimiter {
+  private long nextRefillTime = -1L;
+
+  @Override
+  public long refill(long limit, long available) {
+    final long now = EnvironmentEdgeManager.currentTime();
+    if (now < nextRefillTime) {
+      return 0;
+    }
+    nextRefillTime = now + super.getTimeUnitInMillis();
+    return limit;
+  }
+
+  @Override
+  public long getWaitInterval(long limit, long available, long amount) {
+    if (nextRefillTime == -1) {
+      return 0;
+    }
+    final long now = EnvironmentEdgeManager.currentTime();
+    final long refillTime = nextRefillTime;
+    return refillTime - now;
+  }
+
+  // This method is for strictly testing purpose only
+  @VisibleForTesting
+  public void setNextRefillTime(long nextRefillTime) {
+    this.nextRefillTime = nextRefillTime;
+  }
+
+  @VisibleForTesting
+  public long getNextRefillTime() {
+    return this.nextRefillTime;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d34e9c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
index 5b81269..0d8e51e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -16,36 +16,58 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
- * Simple rate limiter. Usage Example: RateLimiter limiter = new RateLimiter(); // At this
point you
- * have a unlimited resource limiter limiter.set(10, TimeUnit.SECONDS); // set 10 resources/sec
long
- * lastTs = 0; // You need to keep track of the last update timestamp while (true) { long
now =
- * System.currentTimeMillis(); // call canExecute before performing resource consuming operation
- * bool canExecute = limiter.canExecute(now, lastTs); // If there are no available resources,
wait
- * until one is available if (!canExecute) Thread.sleep(limiter.waitInterval()); // ...execute
the
- * work and consume the resource... limiter.consume(); }
+ * Simple rate limiter.
+ *
+ * Usage Example:
+ *    // At this point you have a unlimited resource limiter
+ *   RateLimiter limiter = new AverageIntervalRateLimiter();
+ *                         or new FixedIntervalRateLimiter();
+ *   limiter.set(10, TimeUnit.SECONDS);       // set 10 resources/sec
+ *
+ *   while (true) {
+ *     // call canExecute before performing resource consuming operation
+ *     bool canExecute = limiter.canExecute();
+ *     // If there are no available resources, wait until one is available
+ *     if (!canExecute) Thread.sleep(limiter.waitInterval());
+ *     // ...execute the work and consume the resource...
+ *     limiter.consume();
+ *   }
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public class RateLimiter {
-  private long tunit = 1000; // Timeunit factor for translating to ms.
+public abstract class RateLimiter {
+  public static final String QUOTA_RATE_LIMITER_CONF_KEY = "hbase.quota.rate.limiter";
+  private long tunit = 1000;           // Timeunit factor for translating to ms.
   private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled
to.
   private long avail = Long.MAX_VALUE; // Currently available resource units
 
-  public RateLimiter() {
-  }
+  /**
+   * Refill the available units w.r.t the elapsed time.
+   * @param limit Maximum available resource units that can be refilled to.
+   * @param available Currently available resource units
+   */
+  abstract long refill(long limit, long available);
+
+  /**
+   * Time in milliseconds to wait for before requesting to consume 'amount' resource.
+   * @param limit Maximum available resource units that can be refilled to.
+   * @param available Currently available resource units
+   * @param amount Resources for which time interval to calculate for
+   * @return estimate of the ms required to wait before being able to provide 'amount' resources.
+   */
+  abstract long getWaitInterval(long limit, long available, long amount);
+
 
   /**
    * Set the RateLimiter max available resources and refill period.
    * @param limit The max value available resource units can be refilled to.
    * @param timeUnit Timeunit factor for translating to ms.
    */
-  public synchronized void set(final long limit, final TimeUnit timeUnit) {
+  public void set(final long limit, final TimeUnit timeUnit) {
     switch (timeUnit) {
-    case NANOSECONDS:
-      throw new RuntimeException("Unsupported NANOSECONDS TimeUnit");
-    case MICROSECONDS:
-      throw new RuntimeException("Unsupported MICROSECONDS TimeUnit");
     case MILLISECONDS:
       tunit = 1;
       break;
@@ -62,23 +84,25 @@ public class RateLimiter {
       tunit = 24 * 60 * 60 * 1000;
       break;
     default:
-      throw new RuntimeException("Invalid TimeUnit " + timeUnit);
+      throw new RuntimeException("Unsupported " + timeUnit.name() + " TimeUnit.");
     }
     this.limit = limit;
     this.avail = limit;
   }
 
   public String toString() {
+    String rateLimiter = this.getClass().getSimpleName();
     if (limit == Long.MAX_VALUE) {
-      return "RateLimiter(Bypass)";
+      return rateLimiter + "(Bypass)";
     }
-    return "RateLimiter(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
+    return rateLimiter + "(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
   }
 
   /**
-   * Sets the current instance of RateLimiter to a new values. if current limit is smaller
than the
-   * new limit, bump up the available resources. Otherwise allow clients to use up the previously
-   * available resources.
+   * Sets the current instance of RateLimiter to a new values.
+   *
+   * if current limit is smaller than the new limit, bump up the available resources.
+   * Otherwise allow clients to use up the previously available resources.
    */
   public synchronized void update(final RateLimiter other) {
     this.tunit = other.tunit;
@@ -100,25 +124,38 @@ public class RateLimiter {
     return avail;
   }
 
+  protected long getTimeUnitInMillis() {
+    return tunit;
+  }
+
   /**
-   * given the time interval, is there at least one resource available to allow execution?
-   * @param now the current timestamp
-   * @param lastTs the timestamp of the last update
+   * Is there at least one resource available to allow execution?
    * @return true if there is at least one resource available, otherwise false
    */
-  public boolean canExecute(final long now, final long lastTs) {
-    return canExecute(now, lastTs, 1);
+  public boolean canExecute() {
+    return canExecute(1);
   }
 
   /**
-   * given the time interval, are there enough available resources to allow execution?
-   * @param now the current timestamp
-   * @param lastTs the timestamp of the last update
+   * Are there enough available resources to allow execution?
    * @param amount the number of required resources
    * @return true if there are enough available resources, otherwise false
    */
-  public synchronized boolean canExecute(final long now, final long lastTs, final long amount)
{
-    return avail >= amount ? true : refill(now, lastTs) >= amount;
+  public synchronized boolean canExecute(final long amount) {
+    long refillAmount = refill(limit, avail);
+    if (refillAmount == 0 && avail < amount) {
+      return false;
+    }
+    // check for positive overflow
+    if (avail <= Long.MAX_VALUE - refillAmount) {
+      avail = Math.max(0, Math.min(avail + refillAmount, limit));
+    } else {
+      avail = Math.max(0, limit);
+    }
+    if (avail >= amount) {
+      return true;
+    }
+    return false;
   }
 
   /**
@@ -134,6 +171,9 @@ public class RateLimiter {
    */
   public synchronized void consume(final long amount) {
     this.avail -= amount;
+    if (this.avail < 0) {
+      this.avail = 0;
+    }
   }
 
   /**
@@ -148,18 +188,16 @@ public class RateLimiter {
    */
   public synchronized long waitInterval(final long amount) {
     // TODO Handle over quota?
-    return (amount <= avail) ? 0 : ((amount * tunit) / limit) - ((avail * tunit) / limit);
+    return (amount <= avail) ? 0 : getWaitInterval(limit, avail, amount);
   }
 
-  /**
-   * given the specified time interval, refill the avilable units to the proportionate to
elapsed
-   * time or to the prespecified limit.
-   */
-  private long refill(final long now, final long lastTs) {
-    long delta = (limit * (now - lastTs)) / tunit;
-    if (delta > 0) {
-      avail = Math.min(limit, avail + delta);
-    }
-    return avail;
+  // This method is for strictly testing purpose only
+  @VisibleForTesting
+  public void setNextRefillTime(long nextRefillTime) {
+    this.setNextRefillTime(nextRefillTime);
+  }
+
+  public long getNextRefillTime() {
+    return this.getNextRefillTime();
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d34e9c5c/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
index 4e31f82..beb4147 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -1,16 +1,26 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information
regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version
2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain
a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on
an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License
- * for the specific language governing permissions and limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.hadoop.hbase.quotas;
 
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -18,26 +28,40 @@ import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
 import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
 import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
 import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
 /**
  * Simple time based limiter that checks the quota Throttle
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-public final class TimeBasedLimiter implements QuotaLimiter {
-  private long writeLastTs = 0;
-  private long readLastTs = 0;
-
-  private RateLimiter reqsLimiter = new RateLimiter();
-  private RateLimiter reqSizeLimiter = new RateLimiter();
-  private RateLimiter writeReqsLimiter = new RateLimiter();
-  private RateLimiter writeSizeLimiter = new RateLimiter();
-  private RateLimiter readReqsLimiter = new RateLimiter();
-  private RateLimiter readSizeLimiter = new RateLimiter();
+public class TimeBasedLimiter implements QuotaLimiter {
+  private static final Configuration conf = HBaseConfiguration.create();
+  private RateLimiter reqsLimiter = null;
+  private RateLimiter reqSizeLimiter = null;
+  private RateLimiter writeReqsLimiter = null;
+  private RateLimiter writeSizeLimiter = null;
+  private RateLimiter readReqsLimiter = null;
+  private RateLimiter readSizeLimiter = null;
   private AvgOperationSize avgOpSize = new AvgOperationSize();
 
   private TimeBasedLimiter() {
+    if (FixedIntervalRateLimiter.class.getName().equals(
+      conf.getClass(RateLimiter.QUOTA_RATE_LIMITER_CONF_KEY, AverageIntervalRateLimiter.class)
+          .getName())) {
+      reqsLimiter = new FixedIntervalRateLimiter();
+      reqSizeLimiter = new FixedIntervalRateLimiter();
+      writeReqsLimiter = new FixedIntervalRateLimiter();
+      writeSizeLimiter = new FixedIntervalRateLimiter();
+      readReqsLimiter = new FixedIntervalRateLimiter();
+      readSizeLimiter = new FixedIntervalRateLimiter();
+    } else {
+      reqsLimiter = new AverageIntervalRateLimiter();
+      reqSizeLimiter = new AverageIntervalRateLimiter();
+      writeReqsLimiter = new AverageIntervalRateLimiter();
+      writeSizeLimiter = new AverageIntervalRateLimiter();
+      readReqsLimiter = new AverageIntervalRateLimiter();
+      readSizeLimiter = new AverageIntervalRateLimiter();
+    }
   }
 
   static QuotaLimiter fromThrottle(final Throttle throttle) {
@@ -90,31 +114,28 @@ public final class TimeBasedLimiter implements QuotaLimiter {
 
   @Override
   public void checkQuota(long writeSize, long readSize) throws ThrottlingException {
-    long now = EnvironmentEdgeManager.currentTime();
-    long lastTs = Math.max(readLastTs, writeLastTs);
-
-    if (!reqsLimiter.canExecute(now, lastTs)) {
+    if (!reqsLimiter.canExecute()) {
       ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
     }
-    if (!reqSizeLimiter.canExecute(now, lastTs, writeSize + readSize)) {
-      ThrottlingException.throwNumRequestsExceeded(reqSizeLimiter
+    if (!reqSizeLimiter.canExecute(writeSize + readSize)) {
+      ThrottlingException.throwRequestSizeExceeded(reqSizeLimiter
           .waitInterval(writeSize + readSize));
     }
 
     if (writeSize > 0) {
-      if (!writeReqsLimiter.canExecute(now, writeLastTs)) {
+      if (!writeReqsLimiter.canExecute()) {
         ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
       }
-      if (!writeSizeLimiter.canExecute(now, writeLastTs, writeSize)) {
+      if (!writeSizeLimiter.canExecute(writeSize)) {
         ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
       }
     }
 
     if (readSize > 0) {
-      if (!readReqsLimiter.canExecute(now, readLastTs)) {
+      if (!readReqsLimiter.canExecute()) {
         ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
       }
-      if (!readSizeLimiter.canExecute(now, readLastTs, readSize)) {
+      if (!readSizeLimiter.canExecute(readSize)) {
         ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
       }
     }
@@ -124,20 +145,16 @@ public final class TimeBasedLimiter implements QuotaLimiter {
   public void grabQuota(long writeSize, long readSize) {
     assert writeSize != 0 || readSize != 0;
 
-    long now = EnvironmentEdgeManager.currentTime();
-
     reqsLimiter.consume(1);
     reqSizeLimiter.consume(writeSize + readSize);
 
     if (writeSize > 0) {
       writeReqsLimiter.consume(1);
       writeSizeLimiter.consume(writeSize);
-      writeLastTs = now;
     }
     if (readSize > 0) {
       readReqsLimiter.consume(1);
       readSizeLimiter.consume(readSize);
-      readLastTs = now;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d34e9c5c/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
index 765f321..d2c1507 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java
@@ -48,16 +48,14 @@ public class TestRateLimiter {
 
   private void testWaitInterval(final TimeUnit timeUnit, final long limit,
       final long expectedWaitInterval) {
-    RateLimiter limiter = new RateLimiter();
+    RateLimiter limiter = new AverageIntervalRateLimiter();
     limiter.set(limit, timeUnit);
 
     long nowTs = 0;
-    long lastTs = 0;
-
     // consume all the available resources, one request at the time.
     // the wait interval should be 0
     for (int i = 0; i < (limit - 1); ++i) {
-      assertTrue(limiter.canExecute(nowTs, lastTs));
+      assertTrue(limiter.canExecute());
       limiter.consume();
       long waitInterval = limiter.waitInterval();
       assertEquals(0, waitInterval);
@@ -66,40 +64,102 @@ public class TestRateLimiter {
     for (int i = 0; i < (limit * 4); ++i) {
       // There is one resource available, so we should be able to
       // consume it without waiting.
-      assertTrue(limiter.canExecute(nowTs, lastTs));
+      limiter.setNextRefillTime(limiter.getNextRefillTime() - nowTs);
+      assertTrue(limiter.canExecute());
       assertEquals(0, limiter.waitInterval());
       limiter.consume();
-      lastTs = nowTs;
-
       // No more resources are available, we should wait for at least an interval.
       long waitInterval = limiter.waitInterval();
       assertEquals(expectedWaitInterval, waitInterval);
 
       // set the nowTs to be the exact time when resources should be available again.
-      nowTs += waitInterval;
+      nowTs = waitInterval;
 
       // artificially go into the past to prove that when too early we should fail.
-      assertFalse(limiter.canExecute(nowTs - 500, lastTs));
+      long temp = nowTs + 500;
+      limiter.setNextRefillTime(limiter.getNextRefillTime() + temp);
+      assertFalse(limiter.canExecute());
+      //Roll back the nextRefillTime set to continue further testing
+      limiter.setNextRefillTime(limiter.getNextRefillTime() - temp);
     }
   }
 
   @Test
-  public void testOverconsumption() {
-    RateLimiter limiter = new RateLimiter();
+  public void testOverconsumptionAverageIntervalRefillStrategy() {
+    RateLimiter limiter = new AverageIntervalRateLimiter();
     limiter.set(10, TimeUnit.SECONDS);
 
     // 10 resources are available, but we need to consume 20 resources
     // Verify that we have to wait at least 1.1sec to have 1 resource available
-    assertTrue(limiter.canExecute(0, 0));
+    assertTrue(limiter.canExecute());
     limiter.consume(20);
-    assertEquals(1100, limiter.waitInterval());
+    // To consume 1 resource wait for 100ms
+    assertEquals(100, limiter.waitInterval(1));
+    // To consume 10 resource wait for 1000ms
+    assertEquals(1000, limiter.waitInterval(10));
+
+    limiter.setNextRefillTime(limiter.getNextRefillTime() - 900);
+    // Verify that after 1sec the 1 resource is available
+    assertTrue(limiter.canExecute(1));
+    limiter.setNextRefillTime(limiter.getNextRefillTime() - 100);
+    // Verify that after 1sec the 10 resource is available
+    assertTrue(limiter.canExecute());
+    assertEquals(0, limiter.waitInterval());
+  }
 
-    // Verify that after 1sec we need to wait for another 0.1sec to get a resource available
-    assertFalse(limiter.canExecute(1000, 0));
-    assertEquals(100, limiter.waitInterval());
+  @Test
+  public void testOverconsumptionFixedIntervalRefillStrategy() throws InterruptedException
{
+    RateLimiter limiter = new FixedIntervalRateLimiter();
+    limiter.set(10, TimeUnit.SECONDS);
 
-    // Verify that after 1.1sec the resource is available
-    assertTrue(limiter.canExecute(1100, 0));
+    // 10 resources are available, but we need to consume 20 resources
+    // Verify that we have to wait at least 1.1sec to have 1 resource available
+    assertTrue(limiter.canExecute());
+    limiter.consume(20);
+    // To consume 1 resource also wait for 1000ms
+    assertEquals(1000, limiter.waitInterval(1));
+    // To consume 10 resource wait for 100ms
+    assertEquals(1000, limiter.waitInterval(10));
+
+    limiter.setNextRefillTime(limiter.getNextRefillTime() - 900);
+    // Verify that after 1sec also no resource should be available
+    assertFalse(limiter.canExecute(1));
+    limiter.setNextRefillTime(limiter.getNextRefillTime() - 100);
+
+    // Verify that after 1sec the 10 resource is available
+    assertTrue(limiter.canExecute());
     assertEquals(0, limiter.waitInterval());
   }
+
+  @Test
+  public void testFixedIntervalResourceAvailability() throws Exception {
+    RateLimiter limiter = new FixedIntervalRateLimiter();
+    limiter.set(10, TimeUnit.MILLISECONDS);
+
+    assertTrue(limiter.canExecute(10));
+    limiter.consume(3);
+    assertEquals(7, limiter.getAvailable());
+    assertFalse(limiter.canExecute(10));
+    limiter.setNextRefillTime(limiter.getNextRefillTime() - 3);
+    assertTrue(limiter.canExecute(10));
+    assertEquals(10, limiter.getAvailable());
+  }
+
+  @Test
+  public void testLimiterBySmallerRate() throws InterruptedException {
+    // set limiter is 10 resources per seconds
+    RateLimiter limiter = new FixedIntervalRateLimiter();
+    limiter.set(10, TimeUnit.SECONDS);
+
+    int count = 0; // control the test count
+    while ((count++) < 10) {
+      // test will get 3 resources per 0.5 sec. so it will get 6 resources per sec.
+      limiter.setNextRefillTime(limiter.getNextRefillTime() - 500);
+      for (int i = 0; i < 3; i++) {
+        // 6 resources/sec < limit, so limiter.canExecute(nowTs, lastTs) should be true
+        assertEquals(true, limiter.canExecute());
+        limiter.consume();
+      }
+    }
+  }
 }
\ No newline at end of file


Mime
View raw message