hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [2/7] HBASE-11598 Add simple rpc throttling
Date Thu, 18 Sep 2014 21:45:19 GMT
http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
new file mode 100644
index 0000000..1c41c3d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/QuotaUtil.java
@@ -0,0 +1,319 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.security.UserGroupInformation;
+
+/**
+ * Helper class to interact with the quota table
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class QuotaUtil extends QuotaTableUtil {
+  private static final Log LOG = LogFactory.getLog(QuotaUtil.class);
+
+  public static final String QUOTA_CONF_KEY = "hbase.quota.enabled";
+  private static final boolean QUOTA_ENABLED_DEFAULT = false;
+
+  /** Table descriptor for Quota internal table */
+  public static final HTableDescriptor QUOTA_TABLE_DESC =
+    new HTableDescriptor(QUOTA_TABLE_NAME);
+  static {
+    QUOTA_TABLE_DESC.addFamily(
+      new HColumnDescriptor(QUOTA_FAMILY_INFO)
+        .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+        .setBloomFilterType(BloomType.ROW)
+        .setMaxVersions(1)
+    );
+    QUOTA_TABLE_DESC.addFamily(
+      new HColumnDescriptor(QUOTA_FAMILY_USAGE)
+        .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+        .setBloomFilterType(BloomType.ROW)
+        .setMaxVersions(1)
+    );
+  }
+
+  /** Returns true if the support for quota is enabled */
+  public static boolean isQuotaEnabled(final Configuration conf) {
+    return conf.getBoolean(QUOTA_CONF_KEY, QUOTA_ENABLED_DEFAULT);
+  }
+
+  /* =========================================================================
+   *  Quota "settings" helpers
+   */
+  public static void addTableQuota(final Configuration conf, final TableName table,
+      final Quotas data) throws IOException {
+    addQuotas(conf, getTableRowKey(table), data);
+  }
+
+  public static void deleteTableQuota(final Configuration conf, final TableName table)
+      throws IOException {
+    deleteQuotas(conf, getTableRowKey(table));
+  }
+
+  public static void addNamespaceQuota(final Configuration conf, final String namespace,
+      final Quotas data) throws IOException {
+    addQuotas(conf, getNamespaceRowKey(namespace), data);
+  }
+
+  public static void deleteNamespaceQuota(final Configuration conf, final String namespace)
+      throws IOException {
+    deleteQuotas(conf, getNamespaceRowKey(namespace));
+  }
+
+  public static void addUserQuota(final Configuration conf, final String user,
+      final Quotas data) throws IOException {
+    addQuotas(conf, getUserRowKey(user), data);
+  }
+
+  public static void addUserQuota(final Configuration conf, final String user,
+      final TableName table, final Quotas data) throws IOException {
+    addQuotas(conf, getUserRowKey(user),
+        getSettingsQualifierForUserTable(table), data);
+  }
+
+  public static void addUserQuota(final Configuration conf, final String user,
+      final String namespace, final Quotas data) throws IOException {
+    addQuotas(conf, getUserRowKey(user),
+        getSettingsQualifierForUserNamespace(namespace), data);
+  }
+
+  public static void deleteUserQuota(final Configuration conf, final String user)
+      throws IOException {
+    deleteQuotas(conf, getUserRowKey(user));
+  }
+
+  public static void deleteUserQuota(final Configuration conf, final String user,
+      final TableName table) throws IOException {
+    deleteQuotas(conf, getUserRowKey(user),
+        getSettingsQualifierForUserTable(table));
+  }
+
+  public static void deleteUserQuota(final Configuration conf, final String user,
+      final String namespace) throws IOException {
+    deleteQuotas(conf, getUserRowKey(user),
+        getSettingsQualifierForUserNamespace(namespace));
+  }
+
+  private static void addQuotas(final Configuration conf, final byte[] rowKey,
+      final Quotas data) throws IOException {
+    addQuotas(conf, rowKey, QUOTA_QUALIFIER_SETTINGS, data);
+  }
+
+  private static void addQuotas(final Configuration conf, final byte[] rowKey,
+      final byte[] qualifier, final Quotas data) throws IOException {
+    Put put = new Put(rowKey);
+    put.add(QUOTA_FAMILY_INFO, qualifier, quotasToData(data));
+    doPut(conf, put);
+  }
+
+  private static void deleteQuotas(final Configuration conf, final byte[] rowKey)
+      throws IOException {
+    deleteQuotas(conf, rowKey, null);
+  }
+
+  private static void deleteQuotas(final Configuration conf, final byte[] rowKey,
+      final byte[] qualifier) throws IOException {
+    Delete delete = new Delete(rowKey);
+    if (qualifier != null) {
+      delete.deleteColumns(QUOTA_FAMILY_INFO, qualifier);
+    }
+    doDelete(conf, delete);
+  }
+
+  public static Map<String, UserQuotaState> fetchUserQuotas(final Configuration conf,
+      final List<Get> gets) throws IOException {
+    long nowTs = EnvironmentEdgeManager.currentTime();
+    Result[] results = doGet(conf, gets);
+
+    Map<String, UserQuotaState> userQuotas = new HashMap<String, UserQuotaState>(results.length);
+    for (int i = 0; i < results.length; ++i) {
+      byte[] key = gets.get(i).getRow();
+      assert isUserRowKey(key);
+      String user = getUserFromRowKey(key);
+
+      final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
+      userQuotas.put(user, quotaInfo);
+
+      if (results[i].isEmpty()) continue;
+      assert Bytes.equals(key, results[i].getRow());
+
+      try {
+        parseUserResult(user, results[i], new UserQuotasVisitor() {
+          @Override
+          public void visitUserQuotas(String userName, String namespace, Quotas quotas) {
+            quotaInfo.setQuotas(namespace, quotas);
+          }
+
+          @Override
+          public void visitUserQuotas(String userName, TableName table, Quotas quotas) {
+            quotaInfo.setQuotas(table, quotas);
+          }
+
+          @Override
+          public void visitUserQuotas(String userName, Quotas quotas) {
+            quotaInfo.setQuotas(quotas);
+          }
+        });
+      } catch (IOException e) {
+        LOG.error("Unable to parse user '" + user + "' quotas", e);
+        userQuotas.remove(user);
+      }
+    }
+    return userQuotas;
+  }
+
+  public static Map<TableName, QuotaState> fetchTableQuotas(final Configuration conf,
+      final List<Get> gets) throws IOException {
+    return fetchGlobalQuotas("table", conf, gets, new KeyFromRow<TableName>() {
+      @Override
+      public TableName getKeyFromRow(final byte[] row) {
+        assert isTableRowKey(row);
+        return getTableFromRowKey(row);
+      }
+    });
+  }
+
+  public static Map<String, QuotaState> fetchNamespaceQuotas(final Configuration conf,
+      final List<Get> gets) throws IOException {
+    return fetchGlobalQuotas("namespace", conf, gets, new KeyFromRow<String>() {
+      @Override
+      public String getKeyFromRow(final byte[] row) {
+        assert isNamespaceRowKey(row);
+        return getNamespaceFromRowKey(row);
+      }
+    });
+  }
+
+  public static <K> Map<K, QuotaState> fetchGlobalQuotas(final String type,
+      final Configuration conf, final List<Get> gets, final KeyFromRow<K> kfr) throws IOException {
+    long nowTs = EnvironmentEdgeManager.currentTime();
+    Result[] results = doGet(conf, gets);
+
+    Map<K, QuotaState> globalQuotas = new HashMap<K, QuotaState>(results.length);
+    for (int i = 0; i < results.length; ++i) {
+      byte[] row = gets.get(i).getRow();
+      K key = kfr.getKeyFromRow(row);
+
+      QuotaState quotaInfo = new QuotaState(nowTs);
+      globalQuotas.put(key, quotaInfo);
+
+      if (results[i].isEmpty()) continue;
+      assert Bytes.equals(row, results[i].getRow());
+
+      byte[] data = results[i].getValue(QUOTA_FAMILY_INFO, QUOTA_QUALIFIER_SETTINGS);
+      if (data == null) continue;
+
+      try {
+        Quotas quotas = quotasFromData(data);
+        quotaInfo.setQuotas(quotas);
+      } catch (IOException e) {
+        LOG.error("Unable to parse " + type + " '" + key + "' quotas", e);
+        globalQuotas.remove(key);
+      }
+    }
+    return globalQuotas;
+  }
+
+  private static interface KeyFromRow<T> {
+    T getKeyFromRow(final byte[] row);
+  }
+
+  /* =========================================================================
+   *  HTable helpers
+   */
+  private static void doPut(final Configuration conf, final Put put)
+      throws IOException {
+    HTable table = new HTable(conf, QuotaUtil.QUOTA_TABLE_NAME);
+    try {
+      table.put(put);
+    } finally {
+      table.close();
+    }
+  }
+
+  private static void doDelete(final Configuration conf, final Delete delete)
+      throws IOException {
+    HTable table = new HTable(conf, QuotaUtil.QUOTA_TABLE_NAME);
+    try {
+      table.delete(delete);
+    } finally {
+      table.close();
+    }
+  }
+
+  /* =========================================================================
+   *  Data Size Helpers
+   */
+  public static long calculateMutationSize(final Mutation mutation) {
+    long size = 0;
+    for (Map.Entry<byte [], List<Cell>> entry : mutation.getFamilyCellMap().entrySet()) {
+      for (Cell cell : entry.getValue()) {
+        size += KeyValueUtil.length(cell);
+      }
+    }
+    return size;
+  }
+
+  public static long calculateResultSize(final Result result) {
+    long size = 0;
+    for (Cell cell : result.rawCells()) {
+      size += KeyValueUtil.length(cell);
+    }
+    return size;
+  }
+
+  public static long calculateResultSize(final List<Result> results) {
+    long size = 0;
+    for (Result result: results) {
+      for (Cell cell : result.rawCells()) {
+        size += KeyValueUtil.length(cell);
+      }
+    }
+    return size;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
new file mode 100644
index 0000000..6c98721
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RateLimiter.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Simple rate limiter.
+ *
+ * Usage Example:
+ *   RateLimiter limiter = new RateLimiter(); // At this point you have a unlimited resource limiter
+ *   limiter.set(10, TimeUnit.SECONDS);       // set 10 resources/sec
+ *
+ *   long lastTs = 0;             // You need to keep track of the last update timestamp
+ *   while (true) {
+ *     long now = System.currentTimeMillis();
+ *
+ *     // call canExecute before performing resource consuming operation
+ *     bool canExecute = limiter.canExecute(now, lastTs);
+ *     // If there are no available resources, wait until one is available
+ *     if (!canExecute) Thread.sleep(limiter.waitInterval());
+ *     // ...execute the work and consume the resource...
+ *     limiter.consume();
+ *   }
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RateLimiter {
+  private long tunit = 1000;           // Timeunit factor for translating to ms.
+  private long limit = Long.MAX_VALUE; // The max value available resource units can be refilled to.
+  private long avail = Long.MAX_VALUE; // Currently available resource units
+
+  public RateLimiter() {
+  }
+
+  /**
+   * Set the RateLimiter max available resources and refill period.
+   * @param limit The max value available resource units can be refilled to.
+   * @param timeUnit Timeunit factor for translating to ms.
+   */
+  public void set(final long limit, final TimeUnit timeUnit) {
+    switch (timeUnit) {
+      case NANOSECONDS:
+        throw new RuntimeException("Unsupported NANOSECONDS TimeUnit");
+      case MICROSECONDS:
+        throw new RuntimeException("Unsupported MICROSECONDS TimeUnit");
+      case MILLISECONDS:
+        tunit = 1;
+        break;
+      case SECONDS:
+        tunit = 1000;
+        break;
+      case MINUTES:
+        tunit = 60 * 1000;
+        break;
+      case HOURS:
+        tunit = 60 * 60 * 1000;
+        break;
+      case DAYS:
+        tunit = 24 * 60 * 60 * 1000;
+        break;
+    }
+    this.limit = limit;
+    this.avail = limit;
+  }
+
+  public String toString() {
+    if (limit == Long.MAX_VALUE) {
+      return "RateLimiter(Bypass)";
+    }
+    return "RateLimiter(avail=" + avail + " limit=" + limit + " tunit=" + tunit + ")";
+  }
+
+  /**
+   * Sets the current instance of RateLimiter to a new values.
+   *
+   * if current limit is smaller than the new limit, bump up the available resources.
+   * Otherwise allow clients to use up the previously available resources.
+   */
+  public synchronized void update(final RateLimiter other) {
+    this.tunit = other.tunit;
+    if (this.limit < other.limit) {
+      this.avail += (other.limit - this.limit);
+    }
+    this.limit = other.limit;
+  }
+
+  public synchronized boolean isBypass() {
+    return limit == Long.MAX_VALUE;
+  }
+
+  public synchronized long getLimit() {
+    return limit;
+  }
+
+  public synchronized long getAvailable() {
+    return avail;
+  }
+
+  /**
+   * given the time interval, is there at least one resource available to allow execution?
+   * @param now the current timestamp
+   * @param lastTs the timestamp of the last update
+   * @return true if there is at least one resource available, otherwise false
+   */
+  public boolean canExecute(final long now, final long lastTs) {
+    return canExecute(now, lastTs, 1);
+  }
+
+  /**
+   * given the time interval, are there enough available resources to allow execution?
+   * @param now the current timestamp
+   * @param lastTs the timestamp of the last update
+   * @param amount the number of required resources
+   * @return true if there are enough available resources, otherwise false
+   */
+  public synchronized boolean canExecute(final long now, final long lastTs, final long amount) {
+    return avail >= amount ? true : refill(now, lastTs) >= amount;
+  }
+
+  /**
+   * consume one available unit.
+   */
+  public void consume() {
+    consume(1);
+  }
+
+  /**
+   * consume amount available units.
+   * @param amount the number of units to consume
+   */
+  public synchronized void consume(final long amount) {
+    this.avail -= amount;
+  }
+
+  /**
+   * @return estimate of the ms required to wait before being able to provide 1 resource.
+   */
+  public long waitInterval() {
+    return waitInterval(1);
+  }
+
+  /**
+   * @return estimate of the ms required to wait before being able to provide "amount" resources.
+   */
+  public synchronized long waitInterval(final long amount) {
+    // TODO Handle over quota?
+    return (amount <= avail) ? 0 : ((amount * tunit) / limit) - ((avail * tunit) / limit);
+  }
+
+  /**
+   * given the specified time interval, refill the avilable units to the proportionate
+   * to elapsed time or to the prespecified limit.
+   */
+  private long refill(final long now, final long lastTs) {
+    long delta = (limit * (now - lastTs)) / tunit;
+    if (delta > 0) {
+      avail = Math.min(limit, avail + delta);
+    }
+    return avail;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
new file mode 100644
index 0000000..ac6108f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/RegionServerQuotaManager.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.RequestContext;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Region Server Quota Manager.
+ * It is responsible to provide access to the quota information of each user/table.
+ *
+ * The direct user of this class is the RegionServer that will get and check the
+ * user/table quota for each operation (put, get, scan).
+ * For system tables and user/table with a quota specified, the quota check will be a noop.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class RegionServerQuotaManager {
+  private static final Log LOG = LogFactory.getLog(RegionServerQuotaManager.class);
+
+  private final RegionServerServices rsServices;
+
+  private QuotaCache quotaCache = null;
+
+  public RegionServerQuotaManager(final RegionServerServices rsServices) {
+    this.rsServices = rsServices;
+  }
+
+  public void start(final RpcScheduler rpcScheduler) throws IOException {
+    if (!QuotaUtil.isQuotaEnabled(rsServices.getConfiguration())) {
+      LOG.info("Quota support disabled");
+      return;
+    }
+
+    LOG.info("Initializing quota support");
+
+    // Initialize quota cache
+    quotaCache = new QuotaCache(rsServices);
+    quotaCache.start();
+  }
+
+  public void stop() {
+    if (isQuotaEnabled()) {
+      quotaCache.stop("shutdown");
+    }
+  }
+
+  public boolean isQuotaEnabled() {
+    return quotaCache != null;
+  }
+
+  @VisibleForTesting
+  QuotaCache getQuotaCache() {
+    return quotaCache;
+  }
+
+  /**
+   * Returns the quota for an operation.
+   *
+   * @param ugi the user that is executing the operation
+   * @param table the table where the operation will be executed
+   * @return the OperationQuota
+   */
+  public OperationQuota getQuota(final UserGroupInformation ugi, final TableName table) {
+    if (isQuotaEnabled() && !table.isSystemTable()) {
+      UserQuotaState userQuotaState = quotaCache.getUserQuotaState(ugi);
+      QuotaLimiter userLimiter = userQuotaState.getTableLimiter(table);
+      boolean useNoop = userLimiter.isBypass();
+      if (userQuotaState.hasBypassGlobals()) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" + userLimiter);
+        }
+        if (!useNoop) {
+          return new DefaultOperationQuota(userLimiter);
+        }
+      } else {
+        QuotaLimiter nsLimiter = quotaCache.getNamespaceLimiter(table.getNamespaceAsString());
+        QuotaLimiter tableLimiter = quotaCache.getTableLimiter(table);
+        useNoop &= tableLimiter.isBypass() && nsLimiter.isBypass();
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("get quota for ugi=" + ugi + " table=" + table + " userLimiter=" +
+                    userLimiter + " tableLimiter=" + tableLimiter + " nsLimiter=" + nsLimiter);
+        }
+        if (!useNoop) {
+          return new DefaultOperationQuota(userLimiter, tableLimiter, nsLimiter);
+        }
+      }
+    }
+    return NoopOperationQuota.get();
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user.
+   * Returns the OperationQuota used to get the available quota and
+   * to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param type the operation type
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  public OperationQuota checkQuota(final HRegion region,
+      final OperationQuota.OperationType type) throws IOException, ThrottlingException {
+    switch (type) {
+      case SCAN:   return checkQuota(region, 0, 0, 1);
+      case GET:    return checkQuota(region, 0, 1, 0);
+      case MUTATE: return checkQuota(region, 1, 0, 0);
+    }
+    throw new RuntimeException("Invalid operation type: " + type);
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user.
+   * Returns the OperationQuota used to get the available quota and
+   * to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param actions the "multi" actions to perform
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  public OperationQuota checkQuota(final HRegion region,
+      final List<ClientProtos.Action> actions) throws IOException, ThrottlingException {
+    int numWrites = 0;
+    int numReads = 0;
+    for (final ClientProtos.Action action: actions) {
+      if (action.hasMutation()) {
+        numWrites++;
+      } else if (action.hasGet()) {
+        numReads++;
+      }
+    }
+    return checkQuota(region, numWrites, numReads, 0);
+  }
+
+  /**
+   * Check the quota for the current (rpc-context) user.
+   * Returns the OperationQuota used to get the available quota and
+   * to report the data/usage of the operation.
+   * @param region the region where the operation will be performed
+   * @param numWrites number of writes to perform
+   * @param numReads number of short-reads to perform
+   * @param numScans number of scan to perform
+   * @return the OperationQuota
+   * @throws ThrottlingException if the operation cannot be executed due to quota exceeded.
+   */
+  private OperationQuota checkQuota(final HRegion region,
+      final int numWrites, final int numReads, final int numScans)
+      throws IOException, ThrottlingException {
+    UserGroupInformation ugi;
+    if (RequestContext.isInRequestContext()) {
+      ugi = RequestContext.getRequestUser().getUGI();
+    } else {
+      ugi = User.getCurrent().getUGI();
+    }
+    TableName table = region.getTableDesc().getTableName();
+
+    OperationQuota quota = getQuota(ugi, table);
+    try {
+      quota.checkQuota(numWrites, numReads, numScans);
+    } catch (ThrottlingException e) {
+      LOG.debug("Throttling exception for user=" + ugi.getUserName() +
+                " table=" + table + " numWrites=" + numWrites +
+                " numReads=" + numReads + " numScans=" + numScans +
+                ": " + e.getMessage());
+      throw e;
+    }
+    return quota;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
new file mode 100644
index 0000000..7a0017e
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/TimeBasedLimiter.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.TimedQuota;
+import org.apache.hadoop.hbase.quotas.OperationQuota.AvgOperationSize;
+import org.apache.hadoop.hbase.quotas.OperationQuota.OperationType;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+/**
+ * Simple time based limiter that checks the quota Throttle
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class TimeBasedLimiter implements QuotaLimiter {
+  private static final Log LOG = LogFactory.getLog(TimeBasedLimiter.class);
+
+  private long writeLastTs = 0;
+  private long readLastTs = 0;
+
+  private RateLimiter reqsLimiter = new RateLimiter();
+  private RateLimiter reqSizeLimiter = new RateLimiter();
+  private RateLimiter writeReqsLimiter = new RateLimiter();
+  private RateLimiter writeSizeLimiter = new RateLimiter();
+  private RateLimiter readReqsLimiter = new RateLimiter();
+  private RateLimiter readSizeLimiter = new RateLimiter();
+  private AvgOperationSize avgOpSize = new AvgOperationSize();
+
+  private TimeBasedLimiter() {
+  }
+
+  static QuotaLimiter fromThrottle(final Throttle throttle) {
+    TimeBasedLimiter limiter = new TimeBasedLimiter();
+    boolean isBypass = true;
+    if (throttle.hasReqNum()) {
+      setFromTimedQuota(limiter.reqsLimiter, throttle.getReqNum());
+      isBypass = false;
+    }
+
+    if (throttle.hasReqSize()) {
+      setFromTimedQuota(limiter.reqSizeLimiter, throttle.getReqSize());
+      isBypass = false;
+    }
+
+    if (throttle.hasWriteNum()) {
+      setFromTimedQuota(limiter.writeReqsLimiter, throttle.getWriteNum());
+      isBypass = false;
+    }
+
+    if (throttle.hasWriteSize()) {
+      setFromTimedQuota(limiter.writeSizeLimiter, throttle.getWriteSize());
+      isBypass = false;
+    }
+
+    if (throttle.hasReadNum()) {
+      setFromTimedQuota(limiter.readReqsLimiter, throttle.getReadNum());
+      isBypass = false;
+    }
+
+    if (throttle.hasReadSize()) {
+      setFromTimedQuota(limiter.readSizeLimiter, throttle.getReadSize());
+      isBypass = false;
+    }
+    return isBypass ? NoopQuotaLimiter.get() : limiter;
+  }
+
+  public void update(final TimeBasedLimiter other) {
+    reqsLimiter.update(other.reqsLimiter);
+    reqSizeLimiter.update(other.reqSizeLimiter);
+    writeReqsLimiter.update(other.writeReqsLimiter);
+    writeSizeLimiter.update(other.writeSizeLimiter);
+    readReqsLimiter.update(other.readReqsLimiter);
+    readSizeLimiter.update(other.readSizeLimiter);
+  }
+
+  private static void setFromTimedQuota(final RateLimiter limiter, final TimedQuota timedQuota) {
+    limiter.set(timedQuota.getSoftLimit(), ProtobufUtil.toTimeUnit(timedQuota.getTimeUnit()));
+  }
+
+  @Override
+  public void checkQuota(long writeSize, long readSize)
+      throws ThrottlingException {
+    long now = EnvironmentEdgeManager.currentTime();
+    long lastTs = Math.max(readLastTs, writeLastTs);
+
+    if (!reqsLimiter.canExecute(now, lastTs)) {
+      ThrottlingException.throwNumRequestsExceeded(reqsLimiter.waitInterval());
+    }
+    if (!reqSizeLimiter.canExecute(now, lastTs, writeSize + readSize)) {
+      ThrottlingException.throwNumRequestsExceeded(reqSizeLimiter.waitInterval(writeSize+readSize));
+    }
+
+    if (writeSize > 0) {
+      if (!writeReqsLimiter.canExecute(now, writeLastTs)) {
+        ThrottlingException.throwNumWriteRequestsExceeded(writeReqsLimiter.waitInterval());
+      }
+      if (!writeSizeLimiter.canExecute(now, writeLastTs, writeSize)) {
+        ThrottlingException.throwWriteSizeExceeded(writeSizeLimiter.waitInterval(writeSize));
+      }
+    }
+
+    if (readSize > 0) {
+      if (!readReqsLimiter.canExecute(now, readLastTs)) {
+        ThrottlingException.throwNumReadRequestsExceeded(readReqsLimiter.waitInterval());
+      }
+      if (!readSizeLimiter.canExecute(now, readLastTs, readSize)) {
+        ThrottlingException.throwReadSizeExceeded(readSizeLimiter.waitInterval(readSize));
+      }
+    }
+  }
+
+  @Override
+  public void grabQuota(long writeSize, long readSize) {
+    assert writeSize != 0 || readSize != 0;
+
+    long now = EnvironmentEdgeManager.currentTime();
+
+    reqsLimiter.consume(1);
+    reqSizeLimiter.consume(writeSize + readSize);
+
+    if (writeSize > 0) {
+      writeReqsLimiter.consume(1);
+      writeSizeLimiter.consume(writeSize);
+      writeLastTs = now;
+    }
+    if (readSize > 0) {
+      readReqsLimiter.consume(1);
+      readSizeLimiter.consume(readSize);
+      readLastTs = now;
+    }
+  }
+
+  @Override
+  public void consumeWrite(final long size) {
+    reqSizeLimiter.consume(size);
+    writeSizeLimiter.consume(size);
+  }
+
+  @Override
+  public void consumeRead(final long size) {
+    reqSizeLimiter.consume(size);
+    readSizeLimiter.consume(size);
+  }
+
+  @Override
+  public boolean isBypass() {
+    return false;
+  }
+
+  @Override
+  public long getWriteAvailable() {
+    return writeSizeLimiter.getAvailable();
+  }
+
+  @Override
+  public long getReadAvailable() {
+    return readSizeLimiter.getAvailable();
+  }
+
+  @Override
+  public void addOperationSize(OperationType type, long size) {
+    avgOpSize.addOperationSize(type, size);
+  }
+
+  @Override
+  public long getAvgOperationSize(OperationType type) {
+    return avgOpSize.getAvgOperationSize(type);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("TimeBasedLimiter(");
+    if (!reqsLimiter.isBypass()) builder.append("reqs=" + reqsLimiter);
+    if (!reqSizeLimiter.isBypass()) builder.append(" resSize=" + reqSizeLimiter);
+    if (!writeReqsLimiter.isBypass()) builder.append(" writeReqs=" + writeReqsLimiter);
+    if (!writeSizeLimiter.isBypass()) builder.append(" writeSize=" + writeSizeLimiter);
+    if (!readReqsLimiter.isBypass()) builder.append(" readReqs=" + readReqsLimiter);
+    if (!readSizeLimiter.isBypass()) builder.append(" readSize=" + readSizeLimiter);
+    builder.append(')');
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
new file mode 100644
index 0000000..b83e1c9
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/quotas/UserQuotaState.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Throttle;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import com.google.common.collect.Sets;
+
+/**
+ * In-Memory state of the user quotas
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class UserQuotaState extends QuotaState {
+  private static final Log LOG = LogFactory.getLog(UserQuotaState.class);
+
+  private Map<String, QuotaLimiter> namespaceLimiters = null;
+  private Map<TableName, QuotaLimiter> tableLimiters = null;
+  private boolean bypassGlobals = false;
+
+  public UserQuotaState() {
+    super();
+  }
+
+  public UserQuotaState(final long updateTs) {
+    super(updateTs);
+  }
+
+  @Override
+  public synchronized String toString() {
+    StringBuilder builder = new StringBuilder();
+    builder.append("UserQuotaState(ts=" + lastUpdate);
+    if (bypassGlobals) builder.append(" bypass-globals");
+
+    if (isBypass()) {
+      builder.append(" bypass");
+    } else {
+      if (globalLimiter != NoopQuotaLimiter.get()) {
+        builder.append(" global-limiter");
+      }
+
+      if (tableLimiters != null && !tableLimiters.isEmpty()) {
+        builder.append(" [");
+        for (TableName table: tableLimiters.keySet()) {
+          builder.append(" " + table);
+        }
+        builder.append(" ]");
+      }
+
+      if (namespaceLimiters != null && !namespaceLimiters.isEmpty()) {
+        builder.append(" [");
+        for (String ns: namespaceLimiters.keySet()) {
+          builder.append(" " + ns);
+        }
+        builder.append(" ]");
+      }
+    }
+    builder.append(')');
+    return builder.toString();
+  }
+
+  /**
+   * @return true if there is no quota information associated to this object
+   */
+  @Override
+  public synchronized boolean isBypass() {
+    return !bypassGlobals &&
+            globalLimiter == NoopQuotaLimiter.get() &&
+            (tableLimiters == null || tableLimiters.isEmpty()) &&
+            (namespaceLimiters == null || namespaceLimiters.isEmpty());
+  }
+
+  public synchronized boolean hasBypassGlobals() {
+    return bypassGlobals;
+  }
+
+  @Override
+  public void setQuotas(final Quotas quotas) {
+    super.setQuotas(quotas);
+    bypassGlobals = quotas.getBypassGlobals();
+  }
+
+  /**
+   * Add the quota information of the specified table.
+   * (This operation is part of the QuotaState setup)
+   */
+  public void setQuotas(final TableName table, Quotas quotas) {
+    tableLimiters = setLimiter(tableLimiters, table, quotas);
+  }
+
+  /**
+   * Add the quota information of the specified namespace.
+   * (This operation is part of the QuotaState setup)
+   */
+  public void setQuotas(final String namespace, Quotas quotas) {
+    namespaceLimiters = setLimiter(namespaceLimiters, namespace, quotas);
+  }
+
+  private <K> Map<K, QuotaLimiter> setLimiter(Map<K, QuotaLimiter> limiters,
+      final K key, final Quotas quotas) {
+    if (limiters == null) {
+      limiters = new HashMap<K, QuotaLimiter>();
+    }
+
+    QuotaLimiter limiter = quotas.hasThrottle() ?
+      QuotaLimiterFactory.fromThrottle(quotas.getThrottle()) : null;
+    if (limiter != null && !limiter.isBypass()) {
+      limiters.put(key, limiter);
+    } else {
+      limiters.remove(key);
+    }
+    return limiters;
+  }
+
+  /**
+   * Perform an update of the quota state based on the other quota state object.
+   * (This operation is executed by the QuotaCache)
+   */
+  @Override
+  public synchronized void update(final QuotaState other) {
+    super.update(other);
+
+    if (other instanceof UserQuotaState) {
+      UserQuotaState uOther = (UserQuotaState)other;
+      tableLimiters = updateLimiters(tableLimiters, uOther.tableLimiters);
+      namespaceLimiters = updateLimiters(namespaceLimiters, uOther.namespaceLimiters);
+      bypassGlobals = uOther.bypassGlobals;
+    } else {
+      tableLimiters = null;
+      namespaceLimiters = null;
+      bypassGlobals = false;
+    }
+  }
+
+  private static <K> Map<K, QuotaLimiter> updateLimiters(final Map<K, QuotaLimiter> map,
+      final Map<K, QuotaLimiter> otherMap) {
+    if (map == null) {
+      return otherMap;
+    }
+
+    if (otherMap != null) {
+      // To Remove
+      Set<K> toRemove = new HashSet<K>(map.keySet());
+      toRemove.removeAll(otherMap.keySet());
+      map.keySet().removeAll(toRemove);
+
+      // To Update/Add
+      for (final Map.Entry<K, QuotaLimiter> entry: otherMap.entrySet()) {
+        QuotaLimiter limiter = map.get(entry.getKey());
+        if (limiter == null) {
+          limiter = entry.getValue();
+        } else {
+          limiter = QuotaLimiterFactory.update(limiter, entry.getValue());
+        }
+        map.put(entry.getKey(), limiter);
+      }
+      return map;
+    }
+    return null;
+  }
+
+  /**
+   * Return the limiter for the specified table associated with this quota.
+   * If the table does not have its own quota limiter the global one will be returned.
+   * In case there is no quota limiter associated with this object a noop limiter will be returned.
+   *
+   * @return the quota limiter for the specified table
+   */
+  public synchronized QuotaLimiter getTableLimiter(final TableName table) {
+    lastQuery = EnvironmentEdgeManager.currentTime();
+    if (tableLimiters != null) {
+      QuotaLimiter limiter = tableLimiters.get(table);
+      if (limiter != null) return limiter;
+    }
+    if (namespaceLimiters != null) {
+      QuotaLimiter limiter = namespaceLimiters.get(table.getNamespaceAsString());
+      if (limiter != null) return limiter;
+    }
+    return globalLimiter;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index b60befd..53e2a3b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -111,6 +111,7 @@ import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.Regio
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
@@ -388,6 +389,8 @@ public class HRegionServer extends HasThread implements
 
   private RegionServerProcedureManagerHost rspmHost;
 
+  private RegionServerQuotaManager rsQuotaManager;
+
   // Table level lock manager for locking for region operations
   protected TableLockManager tableLockManager;
 
@@ -707,6 +710,9 @@ public class HRegionServer extends HasThread implements
       nonceManagerChore = this.nonceManager.createCleanupChore(this);
     }
 
+    // Setup the Quota Manager
+    rsQuotaManager = new RegionServerQuotaManager(this);
+
     // Setup RPC client for master communication
     rpcClient = new RpcClient(conf, clusterId, new InetSocketAddress(
       rpcServices.isa.getAddress(), 0));
@@ -759,6 +765,9 @@ public class HRegionServer extends HasThread implements
         // start the snapshot handler and other procedure handlers,
         // since the server is ready to run
         rspmHost.start();
+
+        // Start the Quota Manager
+        rsQuotaManager.start(getRpcServer().getScheduler());
       }
 
       // We registered with the Master.  Go into run mode.
@@ -852,6 +861,11 @@ public class HRegionServer extends HasThread implements
       this.storefileRefresher.interrupt();
     }
 
+    // Stop the quota manager
+    if (rsQuotaManager != null) {
+      rsQuotaManager.stop();
+    }
+
     // Stop the snapshot and other procedure handlers, forcefully killing all running tasks
     if (rspmHost != null) {
       rspmHost.stop(this.abortRequested || this.killed);
@@ -2318,6 +2332,11 @@ public class HRegionServer extends HasThread implements
     return service;
   }
 
+  @Override
+  public RegionServerQuotaManager getRegionServerQuotaManager() {
+    return rsQuotaManager;
+  }
+
   //
   // Main program and support routines
   //
@@ -2436,6 +2455,22 @@ public class HRegionServer extends HasThread implements
      return tableRegions;
    }
 
+  /**
+   * Gets the online tables in this RS.
+   * This method looks at the in-memory onlineRegions.
+   * @return all the online tables in this RS
+   */
+  @Override
+  public Set<TableName> getOnlineTables() {
+    Set<TableName> tables = new HashSet<TableName>();
+    synchronized (this.onlineRegions) {
+      for (HRegion region: this.onlineRegions.values()) {
+        tables.add(region.getTableDesc().getTableName());
+      }
+    }
+    return tables;
+  }
+
   // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
   public String[] getRegionServerCoprocessors() {
     TreeSet<String> coprocessors = new TreeSet<String>(

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index db9aecd..ef5790e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -143,6 +143,9 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.CompactionDescriptor;
+import org.apache.hadoop.hbase.quotas.OperationQuota;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
+import org.apache.hadoop.hbase.quotas.ThrottlingException;
 import org.apache.hadoop.hbase.regionserver.HRegion.Operation;
 import org.apache.hadoop.hbase.regionserver.Leases.LeaseStillHeldException;
 import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
@@ -392,10 +395,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * bypassed as indicated by RegionObserver, null otherwise
    * @throws IOException
    */
-  private Result append(final HRegion region, final MutationProto m,
+  private Result append(final HRegion region, final OperationQuota quota, final MutationProto m,
       final CellScanner cellScanner, long nonceGroup) throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     Append append = ProtobufUtil.toAppend(m, cellScanner);
+    quota.addMutation(append);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preAppend(append);
@@ -428,10 +432,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @return the Result
    * @throws IOException
    */
-  private Result increment(final HRegion region, final MutationProto mutation,
-      final CellScanner cells, long nonceGroup) throws IOException {
+  private Result increment(final HRegion region, final OperationQuota quota,
+      final MutationProto mutation, final CellScanner cells, long nonceGroup)
+      throws IOException {
     long before = EnvironmentEdgeManager.currentTime();
     Increment increment = ProtobufUtil.toIncrement(mutation, cells);
+    quota.addMutation(increment);
     Result r = null;
     if (region.getCoprocessorHost() != null) {
       r = region.getCoprocessorHost().preIncrement(increment);
@@ -468,7 +474,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @return Return the <code>cellScanner</code> passed
    */
   private List<CellScannable> doNonAtomicRegionMutation(final HRegion region,
-      final RegionAction actions, final CellScanner cellScanner,
+      final OperationQuota quota, final RegionAction actions, final CellScanner cellScanner,
       final RegionActionResult.Builder builder, List<CellScannable> cellsToReturn, long nonceGroup) {
     // Gather up CONTIGUOUS Puts and Deletes in this mutations List.  Idea is that rather than do
     // one at a time, we instead pass them in batch.  Be aware that the corresponding
@@ -501,15 +507,15 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           if (type != MutationType.PUT && type != MutationType.DELETE && mutations != null &&
               !mutations.isEmpty()) {
             // Flush out any Puts or Deletes already collected.
-            doBatchOp(builder, region, mutations, cellScanner);
+            doBatchOp(builder, region, quota, mutations, cellScanner);
             mutations.clear();
           }
           switch (type) {
           case APPEND:
-            r = append(region, action.getMutation(), cellScanner, nonceGroup);
+            r = append(region, quota, action.getMutation(), cellScanner, nonceGroup);
             break;
           case INCREMENT:
-            r = increment(region, action.getMutation(), cellScanner,  nonceGroup);
+            r = increment(region, quota, action.getMutation(), cellScanner,  nonceGroup);
             break;
           case PUT:
           case DELETE:
@@ -554,7 +560,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
     // Finish up any outstanding mutations
     if (mutations != null && !mutations.isEmpty()) {
-      doBatchOp(builder, region, mutations, cellScanner);
+      doBatchOp(builder, region, quota, mutations, cellScanner);
     }
     return cellsToReturn;
   }
@@ -567,7 +573,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
    * @param mutations
    */
   private void doBatchOp(final RegionActionResult.Builder builder, final HRegion region,
-      final List<ClientProtos.Action> mutations, final CellScanner cells) {
+      final OperationQuota quota, final List<ClientProtos.Action> mutations,
+      final CellScanner cells) {
     Mutation[] mArray = new Mutation[mutations.size()];
     long before = EnvironmentEdgeManager.currentTime();
     boolean batchContainsPuts = false, batchContainsDelete = false;
@@ -584,6 +591,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           batchContainsDelete = true;
         }
         mArray[i++] = mutation;
+        quota.addMutation(mutation);
       }
 
       if (!region.getRegionInfo().isMetaTable()) {
@@ -805,6 +813,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     return regionServer.getConfiguration();
   }
 
+  private RegionServerQuotaManager getQuotaManager() {
+    return regionServer.getRegionServerQuotaManager();
+  }
+
   void start() {
     rpcServer.start();
   }
@@ -1608,6 +1620,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   public GetResponse get(final RpcController controller,
       final GetRequest request) throws ServiceException {
     long before = EnvironmentEdgeManager.currentTime();
+    OperationQuota quota = null;
     try {
       checkOpen();
       requestCount.increment();
@@ -1618,6 +1631,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       Boolean existence = null;
       Result r = null;
 
+      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.GET);
+
       if (get.hasClosestRowBefore() && get.getClosestRowBefore()) {
         if (get.getColumnCount() != 1) {
           throw new DoNotRetryIOException(
@@ -1647,10 +1662,13 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         ClientProtos.Result pbr =
             ProtobufUtil.toResult(existence, region.getRegionInfo().getReplicaId() != 0);
         builder.setResult(pbr);
-      } else  if (r != null) {
+      } else if (r != null) {
         ClientProtos.Result pbr = ProtobufUtil.toResult(r);
         builder.setResult(pbr);
       }
+      if (r != null) {
+        quota.addGetResult(r);
+      }
       return builder.build();
     } catch (IOException ie) {
       throw new ServiceException(ie);
@@ -1659,6 +1677,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         regionServer.metricsRegionServer.updateGet(
           EnvironmentEdgeManager.currentTime() - before);
       }
+      if (quota != null) {
+        quota.close();
+      }
     }
   }
 
@@ -1693,10 +1714,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
 
     for (RegionAction regionAction : request.getRegionActionList()) {
       this.requestCount.add(regionAction.getActionCount());
+      OperationQuota quota;
       HRegion region;
       regionActionResultBuilder.clear();
       try {
         region = getRegion(regionAction.getRegion());
+        quota = getQuotaManager().checkQuota(region, regionAction.getActionList());
       } catch (IOException e) {
         regionActionResultBuilder.setException(ResponseConverter.buildException(e));
         responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
@@ -1714,10 +1737,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
       } else {
         // doNonAtomicRegionMutation manages the exception internally
-        cellsToReturn = doNonAtomicRegionMutation(region, regionAction, cellScanner,
+        cellsToReturn = doNonAtomicRegionMutation(region, quota, regionAction, cellScanner,
             regionActionResultBuilder, cellsToReturn, nonceGroup);
       }
       responseBuilder.addRegionActionResult(regionActionResultBuilder.build());
+      quota.close();
     }
     // Load the controller with the Cells to return.
     if (cellsToReturn != null && !cellsToReturn.isEmpty() && controller != null) {
@@ -1740,6 +1764,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     // It is also the conduit via which we pass back data.
     PayloadCarryingRpcController controller = (PayloadCarryingRpcController)rpcc;
     CellScanner cellScanner = controller != null? controller.cellScanner(): null;
+    OperationQuota quota = null;
     // Clear scanner so we are not holding on to reference across call.
     if (controller != null) controller.setCellScanner(null);
     try {
@@ -1756,17 +1781,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       Result r = null;
       Boolean processed = null;
       MutationType type = mutation.getMutateType();
+      long mutationSize = 0;
+
+      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.MUTATE);
+
       switch (type) {
       case APPEND:
         // TODO: this doesn't actually check anything.
-        r = append(region, mutation, cellScanner, nonceGroup);
+        r = append(region, quota, mutation, cellScanner, nonceGroup);
         break;
       case INCREMENT:
         // TODO: this doesn't actually check anything.
-        r = increment(region, mutation, cellScanner, nonceGroup);
+        r = increment(region, quota, mutation, cellScanner, nonceGroup);
         break;
       case PUT:
         Put put = ProtobufUtil.toPut(mutation, cellScanner);
+        quota.addMutation(put);
         if (request.hasCondition()) {
           Condition condition = request.getCondition();
           byte[] row = condition.getRow().toByteArray();
@@ -1795,6 +1825,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         break;
       case DELETE:
         Delete delete = ProtobufUtil.toDelete(mutation, cellScanner);
+        quota.addMutation(delete);
         if (request.hasCondition()) {
           Condition condition = request.getCondition();
           byte[] row = condition.getRow().toByteArray();
@@ -1825,12 +1856,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           throw new DoNotRetryIOException(
             "Unsupported mutate type: " + type.name());
       }
-      if (processed != null) builder.setProcessed(processed.booleanValue());
+      if (processed != null) {
+        builder.setProcessed(processed.booleanValue());
+      }
       addResult(builder, r, controller);
       return builder.build();
     } catch (IOException ie) {
       regionServer.checkFileSystem();
       throw new ServiceException(ie);
+    } finally {
+      if (quota != null) {
+        quota.close();
+      }
     }
   }
 
@@ -1844,6 +1881,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   @Override
   public ScanResponse scan(final RpcController controller, final ScanRequest request)
   throws ServiceException {
+    OperationQuota quota = null;
     Leases.Lease lease = null;
     String scannerName = null;
     try {
@@ -1926,6 +1964,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         ttl = this.scannerLeaseTimeoutPeriod;
       }
 
+      quota = getQuotaManager().checkQuota(region, OperationQuota.OperationType.SCAN);
+      long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
+
       if (rows > 0) {
         // if nextCallSeq does not match throw Exception straight away. This needs to be
         // performed even before checking of Lease.
@@ -1972,9 +2013,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           }
 
           if (!done) {
-            long maxResultSize = scanner.getMaxResultSize();
+            long maxResultSize = Math.min(scanner.getMaxResultSize(), maxQuotaResultSize);
             if (maxResultSize <= 0) {
-              maxResultSize = maxScannerResultSize;
+              maxResultSize = maxQuotaResultSize;
             }
             List<Cell> values = new ArrayList<Cell>();
             region.startRegionOperation(Operation.SCAN);
@@ -2017,6 +2058,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             }
           }
 
+          quota.addScanResult(results);
+
           // If the scanner's filter - if any - is done with the scan
           // and wants to tell the client to stop the scan. This is done by passing
           // a null result, and setting moreResults to false.
@@ -2026,7 +2069,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           } else {
             addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
           }
-        } finally { 
+        } finally {
           // We're done. On way out re-add the above removed lease.
           // Adding resets expiration time on lease.
           if (scanners.containsKey(scannerName)) {
@@ -2073,6 +2116,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
       }
       throw new ServiceException(ie);
+    } finally {
+      if (quota != null) {
+        quota.close();
+      }
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
index 63dd003..9d5280e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
@@ -20,16 +20,19 @@ package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.zookeeper.KeeperException;
 
 /**
@@ -68,6 +71,11 @@ public interface RegionServerServices
   TableLockManager getTableLockManager();
 
   /**
+   * @return RegionServer's instance of {@link RegionServerQuotaManager}
+   */
+  RegionServerQuotaManager getRegionServerQuotaManager();
+
+  /**
    * Tasks to perform after region open to complete deploy of region on
    * regionserver
    *
@@ -124,4 +132,9 @@ public interface RegionServerServices
    * @return The RegionServer's NonceManager
    */
   public ServerNonceManager getNonceManager();
+
+  /**
+   * @return all the online tables in this RS
+   */
+  Set<TableName> getOnlineTables();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
index 14aabc2..1d3693f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java
@@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.protobuf.ResponseConverter;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@@ -2228,4 +2229,34 @@ public class AccessController extends BaseMasterAndRegionObserver
   @Override
   public void postRollWALWriterRequest(ObserverContext<RegionServerCoprocessorEnvironment> ctx)
       throws IOException { }
+
+  @Override
+  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final String userName, final Quotas quotas) throws IOException {
+    requirePermission("setUserQuota", Action.ADMIN);
+  }
+
+  @Override
+  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+    requirePermission("setUserTableQuota", tableName, null, null, Action.ADMIN);
+  }
+
+  @Override
+  public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final String userName, final String namespace, final Quotas quotas) throws IOException {
+    requirePermission("setUserNamespaceQuota", Action.ADMIN);
+  }
+
+  @Override
+  public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final TableName tableName, final Quotas quotas) throws IOException {
+    requirePermission("setTableQuota", tableName, null, null, Action.ADMIN);
+  }
+
+  @Override
+  public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+      final String namespace, final Quotas quotas) throws IOException {
+    requirePermission("setNamespaceQuota", Action.ADMIN);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
index 09531d1..d1260d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java
@@ -94,6 +94,7 @@ public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements
 
     public E poll() {
       E elem = objects[head];
+      objects[head] = null;
       head = (head + 1) % objects.length;
       if (head == 0) tail = 0;
       return elem;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
index 14e8d1f..a597bb7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
@@ -22,6 +22,7 @@ import java.net.InetSocketAddress;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -83,11 +85,17 @@ class MockRegionServerServices implements RegionServerServices {
     return this.regions.get(encodedRegionName);
   }
 
+  @Override
   public List<HRegion> getOnlineRegions(TableName tableName) throws IOException {
     return null;
   }
 
   @Override
+  public Set<TableName> getOnlineTables() {
+    return null;
+  }
+
+  @Override
   public void addToOnlineRegions(HRegion r) {
     this.regions.put(r.getRegionInfo().getEncodedName(), r);
   }
@@ -147,6 +155,7 @@ class MockRegionServerServices implements RegionServerServices {
     return null;
   }
 
+  @Override
   public RegionServerAccounting getRegionServerAccounting() {
     return null;
   }
@@ -157,6 +166,11 @@ class MockRegionServerServices implements RegionServerServices {
   }
 
   @Override
+  public RegionServerQuotaManager getRegionServerQuotaManager() {
+    return null;
+  }
+
+  @Override
   public ServerName getServerName() {
     return this.serverName;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
index 84bccf2..e4fe3ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMasterObserver.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
+import org.apache.hadoop.hbase.protobuf.generated.QuotaProtos.Quotas;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -657,11 +658,11 @@ public class TestMasterObserver {
         ObserverContext<MasterCoprocessorEnvironment> ctx) throws IOException {
       preMasterInitializationCalled = true;
     }
-    
+
     public boolean wasMasterInitializationCalled(){
       return preMasterInitializationCalled;
     }
-    
+
     @Override
     public void postStartMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
         throws IOException {
@@ -1009,6 +1010,56 @@ public class TestMasterObserver {
     public void postTableFlush(ObserverContext<MasterCoprocessorEnvironment> ctx,
         TableName tableName) throws IOException {
     }
+
+    @Override
+    public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final TableName tableName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void preSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final String namespace, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetUserQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String userName, final String namespace, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void preSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final TableName tableName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetTableQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final TableName tableName, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void preSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String namespace, final Quotas quotas) throws IOException {
+    }
+
+    @Override
+    public void postSetNamespaceQuota(final ObserverContext<MasterCoprocessorEnvironment> ctx,
+        final String namespace, final Quotas quotas) throws IOException {
+    }
   }
 
   private static HBaseTestingUtility UTIL = new HBaseTestingUtility();

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 9971c4c..73082f8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -85,6 +86,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
+import org.apache.hadoop.hbase.quotas.RegionServerQuotaManager;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -313,10 +315,15 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
     return null;
   }
 
+  @Override
   public TableLockManager getTableLockManager() {
     return new NullTableLockManager();
   }
 
+  public RegionServerQuotaManager getRegionServerQuotaManager() {
+    return null;
+  }
+
   @Override
   public void postOpenDeployTasks(HRegion r)
       throws KeeperException, IOException {
@@ -515,6 +522,12 @@ ClientProtos.ClientService.BlockingInterface, RegionServerServices {
   }
 
   @Override
+  public Set<TableName> getOnlineTables() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
   public Leases getLeases() {
     // TODO Auto-generated method stub
     return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
index 0103639..e084043 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.RegionActionResult;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ResultOrException;
+import org.apache.hadoop.hbase.quotas.MasterQuotaManager;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.testclassification.MasterTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -240,6 +241,11 @@ public class TestCatalogJanitor {
     }
 
     @Override
+    public MasterQuotaManager getMasterQuotaManager() {
+      return null;
+    }
+
+    @Override
     public ServerManager getServerManager() {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/bd8df9cc/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
new file mode 100644
index 0000000..622cac2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestQuotaAdmin.java
@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.quotas;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * minicluster tests that validate that quota  entries are properly set in the quota table
+ */
+@Category({ClientTests.class, MediumTests.class})
+public class TestQuotaAdmin {
+  final Log LOG = LogFactory.getLog(getClass());
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
+    TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, 2000);
+    TEST_UTIL.getConfiguration().setInt("hbase.hstore.compactionThreshold", 10);
+    TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
+    TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
+    TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME.getName());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testSimpleScan() throws Exception {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    String userName = User.getCurrent().getShortName();
+
+    admin.setQuota(QuotaSettingsFactory
+      .throttleUser(userName, ThrottleType.REQUEST_NUMBER, 6, TimeUnit.MINUTES));
+    admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, true));
+
+    QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration());
+    try {
+      int countThrottle = 0;
+      int countGlobalBypass = 0;
+      for (QuotaSettings settings: scanner) {
+        LOG.debug(settings);
+        switch (settings.getQuotaType()) {
+          case THROTTLE:
+            ThrottleSettings throttle = (ThrottleSettings)settings;
+            assertEquals(userName, throttle.getUserName());
+            assertEquals(null, throttle.getTableName());
+            assertEquals(null, throttle.getNamespace());
+            assertEquals(6, throttle.getSoftLimit());
+            assertEquals(TimeUnit.MINUTES, throttle.getTimeUnit());
+            countThrottle++;
+            break;
+          case GLOBAL_BYPASS:
+            countGlobalBypass++;
+            break;
+          default:
+            fail("unexpected settings type: " + settings.getQuotaType());
+        }
+      }
+      assertEquals(1, countThrottle);
+      assertEquals(1, countGlobalBypass);
+    } finally {
+      scanner.close();
+    }
+
+    admin.setQuota(QuotaSettingsFactory.unthrottleUser(userName));
+    assertNumResults(1, null);
+    admin.setQuota(QuotaSettingsFactory.bypassGlobals(userName, false));
+    assertNumResults(0, null);
+  }
+
+  @Test
+  public void testQuotaRetrieverFilter() throws Exception {
+    Admin admin = TEST_UTIL.getHBaseAdmin();
+    TableName[] tables = new TableName[] {
+      TableName.valueOf("T0"), TableName.valueOf("T01"), TableName.valueOf("NS0:T2"),
+    };
+    String[] namespaces = new String[] { "NS0", "NS01", "NS2" };
+    String[] users = new String[] { "User0", "User01", "User2" };
+
+    for (String user: users) {
+      admin.setQuota(QuotaSettingsFactory
+        .throttleUser(user, ThrottleType.REQUEST_NUMBER, 1, TimeUnit.MINUTES));
+
+      for (TableName table: tables) {
+        admin.setQuota(QuotaSettingsFactory
+          .throttleUser(user, table, ThrottleType.REQUEST_NUMBER, 2, TimeUnit.MINUTES));
+      }
+
+      for (String ns: namespaces) {
+        admin.setQuota(QuotaSettingsFactory
+          .throttleUser(user, ns, ThrottleType.REQUEST_NUMBER, 3, TimeUnit.MINUTES));
+      }
+    }
+    assertNumResults(21, null);
+
+    for (TableName table: tables) {
+      admin.setQuota(QuotaSettingsFactory
+        .throttleTable(table, ThrottleType.REQUEST_NUMBER, 4, TimeUnit.MINUTES));
+    }
+    assertNumResults(24, null);
+
+    for (String ns: namespaces) {
+      admin.setQuota(QuotaSettingsFactory
+        .throttleNamespace(ns, ThrottleType.REQUEST_NUMBER, 5, TimeUnit.MINUTES));
+    }
+    assertNumResults(27, null);
+
+    assertNumResults(7, new QuotaFilter().setUserFilter("User0"));
+    assertNumResults(0, new QuotaFilter().setUserFilter("User"));
+    assertNumResults(21, new QuotaFilter().setUserFilter("User.*"));
+    assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("T0"));
+    assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setTableFilter("NS.*"));
+    assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setTableFilter("T"));
+    assertNumResults(6, new QuotaFilter().setUserFilter("User.*").setTableFilter("T.*"));
+    assertNumResults(3, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS0"));
+    assertNumResults(0, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS"));
+    assertNumResults(9, new QuotaFilter().setUserFilter("User.*").setNamespaceFilter("NS.*"));
+    assertNumResults(6, new QuotaFilter().setUserFilter("User.*")
+                                            .setTableFilter("T0").setNamespaceFilter("NS0"));
+    assertNumResults(1, new QuotaFilter().setTableFilter("T0"));
+    assertNumResults(0, new QuotaFilter().setTableFilter("T"));
+    assertNumResults(2, new QuotaFilter().setTableFilter("T.*"));
+    assertNumResults(3, new QuotaFilter().setTableFilter(".*T.*"));
+    assertNumResults(1, new QuotaFilter().setNamespaceFilter("NS0"));
+    assertNumResults(0, new QuotaFilter().setNamespaceFilter("NS"));
+    assertNumResults(3, new QuotaFilter().setNamespaceFilter("NS.*"));
+
+    for (String user: users) {
+      admin.setQuota(QuotaSettingsFactory.unthrottleUser(user));
+      for (TableName table: tables) {
+        admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, table));
+      }
+      for (String ns: namespaces) {
+        admin.setQuota(QuotaSettingsFactory.unthrottleUser(user, ns));
+      }
+    }
+    assertNumResults(6, null);
+
+    for (TableName table: tables) {
+      admin.setQuota(QuotaSettingsFactory.unthrottleTable(table));
+    }
+    assertNumResults(3, null);
+
+    for (String ns: namespaces) {
+      admin.setQuota(QuotaSettingsFactory.unthrottleNamespace(ns));
+    }
+    assertNumResults(0, null);
+  }
+
+  private void assertNumResults(int expected, final QuotaFilter filter) throws Exception {
+    assertEquals(expected, countResults(filter));
+  }
+
+  private int countResults(final QuotaFilter filter) throws Exception {
+    QuotaRetriever scanner = QuotaRetriever.open(TEST_UTIL.getConfiguration(), filter);
+    try {
+      int count = 0;
+      for (QuotaSettings settings: scanner) {
+        LOG.debug(settings);
+        count++;
+      }
+      return count;
+    } finally {
+      scanner.close();
+    }
+  }
+}
\ No newline at end of file


Mime
View raw message