hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From apurt...@apache.org
Subject [2/4] hbase git commit: HBASE-15816 Provide client with ability to set priority on Operations
Date Sat, 22 Jul 2017 00:52:00 GMT
HBASE-15816 Provide client with ability to set priority on Operations

Signed-off-by: Andrew Purtell <apurtell@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d461bec6
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d461bec6
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d461bec6

Branch: refs/heads/branch-2
Commit: d461bec6c2c6d4035dc6d2ad2cebe976eba24aef
Parents: 9462891
Author: rgidwani <rgidwani@salesforce.com>
Authored: Fri Jul 14 10:18:26 2017 -0700
Committer: Andrew Purtell <apurtell@apache.org>
Committed: Fri Jul 21 17:12:21 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Action.java  |  8 +++++++
 .../org/apache/hadoop/hbase/client/Append.java  |  6 +++++
 .../hadoop/hbase/client/AsyncProcess.java       | 17 +++++++++++---
 .../hbase/client/AsyncRequestFutureImpl.java    |  2 +-
 .../client/CancellableRegionServerCallable.java |  4 ++--
 .../hbase/client/ClientServiceCallable.java     |  5 ++--
 .../org/apache/hadoop/hbase/client/Delete.java  |  6 +++++
 .../org/apache/hadoop/hbase/client/Get.java     |  5 ++++
 .../org/apache/hadoop/hbase/client/HTable.java  | 20 ++++++++--------
 .../apache/hadoop/hbase/client/Increment.java   |  6 +++++
 .../apache/hadoop/hbase/client/MultiAction.java | 12 ++++++++++
 .../hbase/client/MultiServerCallable.java       |  4 ++--
 .../apache/hadoop/hbase/client/Mutation.java    |  5 +++-
 .../client/NoncedRegionServerCallable.java      |  4 ++--
 .../hbase/client/OperationWithAttributes.java   | 12 ++++++++++
 .../client/RegionCoprocessorRpcChannel.java     |  3 ++-
 .../hbase/client/RegionServerCallable.java      | 11 +++++++++
 .../hadoop/hbase/client/RowMutations.java       |  8 +++++++
 .../RpcRetryingCallerWithReadReplicas.java      |  4 ++--
 .../org/apache/hadoop/hbase/client/Scan.java    |  7 ++++++
 .../hadoop/hbase/client/ScannerCallable.java    |  2 +-
 .../hbase/client/SecureBulkLoadClient.java      |  7 +++---
 .../hadoop/hbase/ipc/HBaseRpcController.java    |  2 --
 .../hbase/ipc/HBaseRpcControllerImpl.java       |  7 +++---
 .../org/apache/hadoop/hbase/ipc/IPCUtil.java    |  3 ++-
 .../org/apache/hadoop/hbase/HConstants.java     |  1 +
 .../hbase/client/TestRpcControllerFactory.java  | 24 ++++++++++++++++++--
 ...gionServerBulkLoadWithOldSecureEndpoint.java |  5 ++--
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  3 +++
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |  2 +-
 .../regionserver/wal/WALEditsReplaySink.java    |  2 +-
 .../org/apache/hadoop/hbase/client/TestHCM.java |  2 +-
 .../hbase/client/TestReplicaWithCluster.java    |  2 +-
 .../apache/hadoop/hbase/io/TestHeapSize.java    |  2 ++
 .../TestLoadIncrementalHFilesSplitRecovery.java |  2 +-
 .../hadoop/hbase/quotas/TestSpaceQuotas.java    |  3 ++-
 .../regionserver/TestHRegionServerBulkLoad.java |  5 ++--
 .../TestHRegionServerBulkLoadWithOldClient.java |  5 ++--
 38 files changed, 178 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
index ef05912..f4b696a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Action.java
@@ -32,10 +32,16 @@ public class Action implements Comparable<Action> {
   private final int originalIndex;
   private long nonce = HConstants.NO_NONCE;
   private int replicaId = RegionReplicaUtil.DEFAULT_REPLICA_ID;
+  private int priority;
 
   public Action(Row action, int originalIndex) {
+    this(action, originalIndex, HConstants.PRIORITY_UNSET);
+  }
+
+  public Action(Row action, int originalIndex, int priority) {
     this.action = action;
     this.originalIndex = originalIndex;
+    this.priority = priority;
   }
 
   /**
@@ -70,6 +76,8 @@ public class Action implements Comparable<Action> {
     return replicaId;
   }
 
+  public int getPriority() { return priority; }
+
   @Override
   public int compareTo(Action other) {
     return action.compareTo(other.getAction());

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 346eb0e..02ec770 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -84,6 +84,7 @@ public class Append extends Mutation {
     for (Map.Entry<String, byte[]> entry : a.getAttributesMap().entrySet()) {
       this.setAttribute(entry.getKey(), entry.getValue());
     }
+    this.setPriority(a.getPriority());
   }
 
   /** Create a Append operation for the specified row.
@@ -184,6 +185,11 @@ public class Append extends Mutation {
   }
 
   @Override
+  public Append setPriority(int priority) {
+    return (Append) super.setPriority(priority);
+  }
+
+  @Override
   public Append setTTL(long ttl) {
     return (Append) super.setTTL(ttl);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 22efdaa..8693b3c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -291,7 +291,12 @@ class AsyncProcess {
           LOG.error("Failed to get region location ", ex);
           // This action failed before creating ars. Retain it, but do not add to submit list.
           // We will then add it to ars in an already-failed state.
-          retainedActions.add(new Action(r, ++posInList));
+
+          int priority = HConstants.NORMAL_QOS;
+          if (r instanceof Mutation) {
+            priority = ((Mutation) r).getPriority();
+          }
+          retainedActions.add(new Action(r, ++posInList, priority));
           locationErrors.add(ex);
           locationErrorRows.add(posInList);
           it.remove();
@@ -302,7 +307,11 @@ class AsyncProcess {
           break;
         }
         if (code == ReturnCode.INCLUDE) {
-          Action action = new Action(r, ++posInList);
+          int priority = HConstants.NORMAL_QOS;
+          if (r instanceof Mutation) {
+            priority = ((Mutation) r).getPriority();
+          }
+          Action action = new Action(r, ++posInList, priority);
           setNonce(ng, r, action);
           retainedActions.add(action);
           // TODO: replica-get is not supported on this path
@@ -372,6 +381,7 @@ class AsyncProcess {
     // The position will be used by the processBatch to match the object array returned.
     int posInList = -1;
     NonceGenerator ng = this.connection.getNonceGenerator();
+    int highestPriority = HConstants.PRIORITY_UNSET;
     for (Row r : rows) {
       posInList++;
       if (r instanceof Put) {
@@ -379,8 +389,9 @@ class AsyncProcess {
         if (put.isEmpty()) {
           throw new IllegalArgumentException("No columns to insert for #" + (posInList+1)+ " item");
         }
+        highestPriority = Math.max(put.getPriority(), highestPriority);
       }
-      Action action = new Action(r, posInList);
+      Action action = new Action(r, posInList, highestPriority);
       setNonce(ng, r, action);
       actions.add(action);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 710ec91..5a5a3e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -1267,6 +1267,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
   private MultiServerCallable createCallable(final ServerName server, TableName tableName,
       final MultiAction multi) {
     return new MultiServerCallable(asyncProcess.connection, tableName, server,
-        multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);
+        multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker, multi.getPriority());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
index a0ff900..c0e64e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
@@ -40,8 +40,8 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
   private final RetryingTimeTracker tracker;
   private final int rpcTimeout;
   CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
-      RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) {
-    super(connection, tableName, row, rpcController);
+      RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+    super(connection, tableName, row, rpcController, priority);
     this.rpcTimeout = rpcTimeout;
     this.tracker = tracker;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
index 5fa8de1..00e9558 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientServiceCallable.java
@@ -33,9 +33,10 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
 @InterfaceAudience.Private
 public abstract class ClientServiceCallable<T> extends
     RegionServerCallable<T, ClientProtos.ClientService.BlockingInterface> {
+
   public ClientServiceCallable(Connection connection, TableName tableName, byte [] row,
-      RpcController rpcController) {
-    super(connection, tableName, row, rpcController);
+      RpcController rpcController, int priority) {
+    super(connection, tableName, row, rpcController, priority);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index 0b3769d..351d8a6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -147,6 +147,7 @@ public class Delete extends Mutation implements Comparable<Row> {
     for (Map.Entry<String, byte[]> entry : d.getAttributesMap().entrySet()) {
       this.setAttribute(entry.getKey(), entry.getValue());
     }
+    super.setPriority(d.getPriority());
   }
 
   /**
@@ -369,4 +370,9 @@ public class Delete extends Mutation implements Comparable<Row> {
   public Delete setTTL(long ttl) {
     throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
   }
+
+  @Override
+  public Delete setPriority(int priority) {
+    return (Delete) super.setPriority(priority);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index c3ddc4b..b774a9a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -127,6 +127,7 @@ public class Get extends Query
       TimeRange tr = entry.getValue();
       setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
     }
+    super.setPriority(get.getPriority());
   }
 
   /**
@@ -552,4 +553,8 @@ public class Get extends Query
       return (Get) super.setIsolationLevel(level);
   }
 
+  @Override
+  public Get setPriority(int priority) {
+    return (Get) super.setPriority(priority);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index a48b9e0..c0d321b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -415,7 +415,7 @@ public class HTable implements Table {
     if (get.getConsistency() == Consistency.STRONG) {
       final Get configuredGet = get;
       ClientServiceCallable<Result> callable = new ClientServiceCallable<Result>(this.connection, getName(),
-          get.getRow(), this.rpcControllerFactory.newController()) {
+          get.getRow(), this.rpcControllerFactory.newController(), get.getPriority()) {
         @Override
         protected Result rpcCall() throws Exception {
           ClientProtos.GetRequest request = RequestConverter.buildGetRequest(
@@ -547,7 +547,7 @@ public class HTable implements Table {
     CancellableRegionServerCallable<SingleResponse> callable =
         new CancellableRegionServerCallable<SingleResponse>(
             connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(),
-            writeRpcTimeout, new RetryingTimeTracker().start()) {
+            writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) {
       @Override
       protected SingleResponse rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildMutateRequest(
@@ -624,7 +624,7 @@ public class HTable implements Table {
   public void mutateRow(final RowMutations rm) throws IOException {
     CancellableRegionServerCallable<MultiResponse> callable =
       new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
-          rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){
+          rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()){
       @Override
       protected MultiResponse rpcCall() throws Exception {
         RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
@@ -668,7 +668,7 @@ public class HTable implements Table {
     checkHasFamilies(append);
     NoncedRegionServerCallable<Result> callable =
         new NoncedRegionServerCallable<Result>(this.connection, getName(), append.getRow(),
-            this.rpcControllerFactory.newController()) {
+            this.rpcControllerFactory.newController(), append.getPriority()) {
       @Override
       protected Result rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildMutateRequest(
@@ -690,7 +690,7 @@ public class HTable implements Table {
     checkHasFamilies(increment);
     NoncedRegionServerCallable<Result> callable =
         new NoncedRegionServerCallable<Result>(this.connection, getName(), increment.getRow(),
-            this.rpcControllerFactory.newController()) {
+            this.rpcControllerFactory.newController(), increment.getPriority()) {
       @Override
       protected Result rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildMutateRequest(
@@ -734,7 +734,7 @@ public class HTable implements Table {
 
     NoncedRegionServerCallable<Long> callable =
         new NoncedRegionServerCallable<Long>(this.connection, getName(), row,
-            this.rpcControllerFactory.newController()) {
+            this.rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
       @Override
       protected Long rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildIncrementRequest(
@@ -758,7 +758,7 @@ public class HTable implements Table {
       final Put put)
   throws IOException {
     ClientServiceCallable<Boolean> callable = new ClientServiceCallable<Boolean>(this.connection, getName(), row,
-        this.rpcControllerFactory.newController()) {
+        this.rpcControllerFactory.newController(), put.getPriority()) {
       @Override
       protected Boolean rpcCall() throws Exception {
         MutateRequest request = RequestConverter.buildMutateRequest(
@@ -782,7 +782,7 @@ public class HTable implements Table {
   throws IOException {
     ClientServiceCallable<Boolean> callable =
         new ClientServiceCallable<Boolean>(this.connection, getName(), row,
-            this.rpcControllerFactory.newController()) {
+            this.rpcControllerFactory.newController(), put.getPriority()) {
       @Override
       protected Boolean rpcCall() throws Exception {
         CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -817,7 +817,7 @@ public class HTable implements Table {
     CancellableRegionServerCallable<SingleResponse> callable =
         new CancellableRegionServerCallable<SingleResponse>(
             this.connection, getName(), row, this.rpcControllerFactory.newController(),
-            writeRpcTimeout, new RetryingTimeTracker().start()) {
+            writeRpcTimeout, new RetryingTimeTracker().start(), delete.getPriority()) {
       @Override
       protected SingleResponse rpcCall() throws Exception {
         CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -858,7 +858,7 @@ public class HTable implements Table {
     throws IOException {
     CancellableRegionServerCallable<MultiResponse> callable =
       new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
-        rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) {
+        rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start(), rm.getMaxPriority()) {
         @Override
         protected MultiResponse rpcCall() throws Exception {
           CompareType compareType = CompareType.valueOf(compareOp.name());

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index 4ba0efa..d323555 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -84,6 +84,7 @@ public class Increment extends Mutation implements Comparable<Row> {
     for (Map.Entry<String, byte[]> entry : i.getAttributesMap().entrySet()) {
       this.setAttribute(entry.getKey(), entry.getValue());
     }
+    super.setPriority(i.getPriority());
   }
 
   /**
@@ -331,4 +332,9 @@ public class Increment extends Mutation implements Comparable<Row> {
   public Increment setTTL(long ttl) {
     return (Increment) super.setTTL(ttl);
   }
+
+  @Override
+  public Increment setPriority(int priority) {
+    return (Increment) super.setPriority(priority);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
index a4aa71d..bcec395 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
@@ -20,11 +20,16 @@ package org.apache.hadoop.hbase.client;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.function.Consumer;
+import java.util.function.Predicate;
 
+import com.google.common.collect.Iterables;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -103,4 +108,11 @@ public final class MultiAction {
   public long getNonceGroup() {
     return this.nonceGroup;
   }
+
+  // returns the max priority of all the actions
+  public int getPriority() {
+    Optional<Action> result = actions.values().stream().flatMap(List::stream)
+        .max((action1, action2) -> Math.max(action1.getPriority(), action2.getPriority()));
+    return result.isPresent() ? result.get().getPriority() : HConstants.PRIORITY_UNSET;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 64dada0..33c9a0b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -55,8 +55,8 @@ class MultiServerCallable extends CancellableRegionServerCallable<MultiResponse>
 
   MultiServerCallable(final ClusterConnection connection, final TableName tableName,
       final ServerName location, final MultiAction multi, RpcController rpcController,
-      int rpcTimeout, RetryingTimeTracker tracker) {
-    super(connection, tableName, null, rpcController, rpcTimeout, tracker);
+      int rpcTimeout, RetryingTimeTracker tracker, int priority) {
+    super(connection, tableName, null, rpcController, rpcTimeout, tracker, priority);
     this.multiAction = multi;
     // RegionServerCallable has HRegionLocation field, but this is a multi-region request.
     // Using region info from parent HRegionLocation would be a mistake for this class; so

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index f6cb4b1..3b60497 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -71,7 +71,10 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
       // familyMap
       ClassSize.REFERENCE +
       // familyMap
-      ClassSize.TREEMAP);
+      ClassSize.TREEMAP +
+      // priority
+      ClassSize.INTEGER
+  );
 
   /**
    * The attribute for storing the list of clusters that have consumed the change.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
index 52ed263..5dc19f6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
@@ -47,8 +47,8 @@ public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallabl
    * @param row The row we want in <code>tableName</code>.
    */
   public NoncedRegionServerCallable(Connection connection, TableName tableName, byte [] row,
-      HBaseRpcController rpcController) {
-    super(connection, tableName, row, rpcController);
+      HBaseRpcController rpcController, int priority) {
+    super(connection, tableName, row, rpcController, priority);
     this.nonce = getConnection().getNonceGenerator().newNonce();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
index ba21cbb..1fb691a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/OperationWithAttributes.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
@@ -34,6 +35,7 @@ public abstract class OperationWithAttributes extends Operation implements Attri
 
   // used for uniquely identifying an operation
   public static final String ID_ATRIBUTE = "_operation.attributes.id";
+  private int priority = HConstants.PRIORITY_UNSET;
 
   @Override
   public OperationWithAttributes setAttribute(String name, byte[] value) {
@@ -108,4 +110,14 @@ public abstract class OperationWithAttributes extends Operation implements Attri
     byte[] attr = getAttribute(ID_ATRIBUTE);
     return attr == null? null: Bytes.toString(attr);
   }
+
+  public OperationWithAttributes setPriority(int priority) {
+    this.priority = priority;
+    return this;
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
index 3b10549..df7d74f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionCoprocessorRpcChannel.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
@@ -77,7 +78,7 @@ class RegionCoprocessorRpcChannel extends SyncCoprocessorRpcChannel {
     }
     ClientServiceCallable<CoprocessorServiceResponse> callable =
       new ClientServiceCallable<CoprocessorServiceResponse>(this.conn,
-              this.table, this.row, this.conn.getRpcControllerFactory().newController()) {
+              this.table, this.row, this.conn.getRpcControllerFactory().newController(), HConstants.PRIORITY_UNSET) {
       @Override
       protected CoprocessorServiceResponse rpcCall() throws Exception {
         byte [] regionName = getLocation().getRegionInfo().getRegionName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
index fb593a3..499685d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegionServerCallable.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.client;
 import java.io.IOException;
 
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
@@ -66,6 +67,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
    * Can be null!
    */
   protected final RpcController rpcController;
+  private int priority = HConstants.NORMAL_QOS;
 
   /**
    * @param connection Connection to use.
@@ -75,11 +77,17 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
    */
   public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
       RpcController rpcController) {
+    this(connection, tableName, row, rpcController, HConstants.NORMAL_QOS);
+  }
+
+  public RegionServerCallable(Connection connection, TableName tableName, byte [] row,
+      RpcController rpcController, int priority) {
     super();
     this.connection = connection;
     this.tableName = tableName;
     this.row = row;
     this.rpcController = rpcController;
+    this.priority = priority;
   }
 
   protected RpcController getRpcController() {
@@ -111,6 +119,7 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
           // If it is an instance of HBaseRpcController, we can set priority on the controller based
           // off the tableName. Set call timeout too.
           hrc.setPriority(tableName);
+          hrc.setPriority(priority);
           hrc.setCallTimeout(callTimeout);
         }
       }
@@ -172,6 +181,8 @@ public abstract class RegionServerCallable<T, S> implements RetryingCallable<T>
     return this.row;
   }
 
+  protected int getPriority() { return this.priority;}
+
   public void throwable(Throwable t, boolean retrying) {
     if (location != null) {
       getConnection().updateCachedLocations(tableName, location.getRegionInfo().getRegionName(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
index a9384ac..a6d6d39 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RowMutations.java
@@ -118,4 +118,12 @@ public class RowMutations implements Row {
   public List<Mutation> getMutations() {
     return Collections.unmodifiableList(mutations);
   }
+
+  public int getMaxPriority() {
+    int maxPriority = Integer.MIN_VALUE;
+    for (Mutation mutation : mutations) {
+      maxPriority = Math.max(maxPriority, mutation.getPriority());
+    }
+    return maxPriority;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index b5cddde..3cd9b2f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -27,7 +27,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -45,6 +44,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
+import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
 
 /**
  * Caller that goes to replica if the primary region does no answer within a configurable
@@ -96,7 +96,7 @@ public class RpcRetryingCallerWithReadReplicas {
     public ReplicaRegionServerCallable(int id, HRegionLocation location) {
       super(RpcRetryingCallerWithReadReplicas.this.cConnection,
           RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
-          rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker());
+          rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker(), PRIORITY_UNSET);
       this.id = id;
       this.location = location;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index 639f43e..e84716f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -276,6 +276,7 @@ public class Scan extends Query {
     this.mvccReadPoint = scan.getMvccReadPoint();
     this.limit = scan.getLimit();
     this.needCursorResult = scan.isNeedCursorResult();
+    setPriority(scan.getPriority());
   }
 
   /**
@@ -306,6 +307,7 @@ public class Scan extends Query {
       setColumnFamilyTimeRange(entry.getKey(), tr.getMin(), tr.getMax());
     }
     this.mvccReadPoint = -1L;
+    setPriority(get.getPriority());
   }
 
   public boolean isGetScan() {
@@ -1060,6 +1062,11 @@ public class Scan extends Query {
     return (Scan) super.setIsolationLevel(level);
   }
 
+  @Override
+  public Scan setPriority(int priority) {
+    return (Scan) super.setPriority(priority);
+  }
+
   /**
    * Enable collection of {@link ScanMetrics}. For advanced users.
    * @param enabled Set to true to enable accumulating scan metrics

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 4227e41..bb8b185 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -117,7 +117,7 @@ public class ScannerCallable extends ClientServiceCallable<Result[]> {
    */
   public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan,
       ScanMetrics scanMetrics, RpcControllerFactory rpcControllerFactory, int id) {
-    super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController());
+    super(connection, tableName, scan.getStartRow(), rpcControllerFactory.newController(), scan.getPriority());
     this.id = id;
     this.scan = scan;
     this.scanMetrics = scanMetrics;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index c8d9738..aa9f645 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
@@ -39,6 +38,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpeci
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.security.token.Token;
 
+import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
+
 /**
  * Client proxy for SecureBulkLoadProtocol
  */
@@ -56,7 +57,7 @@ public class SecureBulkLoadClient {
     try {
       ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn,
           table.getName(), HConstants.EMPTY_START_ROW,
-          this.rpcControllerFactory.newController()) {
+          this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
         @Override
         protected String rpcCall() throws Exception {
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
@@ -79,7 +80,7 @@ public class SecureBulkLoadClient {
   public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
     try {
       ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
-          table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController()) {
+          table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
         @Override
         protected Void rpcCall() throws Exception {
           byte[] regionName = getLocation().getRegionInfo().getRegionName();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
index 71ce70a..b925330 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcController.java
@@ -37,8 +37,6 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 @InterfaceAudience.Private
 public interface HBaseRpcController extends RpcController, CellScannable {
 
-  static final int PRIORITY_UNSET = -1;
-
   /**
    * Only used to send cells to rpc server, the returned cells should be set by
    * {@link #setDone(CellScanner)}.

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
index 8ceac64..64d91f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/HBaseRpcControllerImpl.java
@@ -56,7 +56,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
    * This is the ordained way of setting priorities going forward. We will be undoing the old
    * annotation-based mechanism.
    */
-  private int priority = PRIORITY_UNSET;
+  private int priority = HConstants.PRIORITY_UNSET;
 
   /**
    * They are optionally set on construction, cleared after we make the call, and then optionally
@@ -95,7 +95,8 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
 
   @Override
   public void setPriority(int priority) {
-    this.priority = priority;
+    this.priority = Math.max(this.priority, priority);
+
   }
 
   @Override
@@ -106,7 +107,7 @@ public class HBaseRpcControllerImpl implements HBaseRpcController {
 
   @Override
   public int getPriority() {
-    return priority;
+    return priority < 0 ? HConstants.NORMAL_QOS : priority;
   }
 
   @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC",

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 6dab3b5..e0636eb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -29,6 +29,7 @@ import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
@@ -111,7 +112,7 @@ class IPCUtil {
       builder.setCellBlockMeta(cellBlockMeta);
     }
     // Only pass priority if there is one set.
-    if (call.priority != HBaseRpcController.PRIORITY_UNSET) {
+    if (call.priority != HConstants.PRIORITY_UNSET) {
       builder.setPriority(call.priority);
     }
     builder.setTimeout(call.timeout);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index dfc140b..54e0eb8 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1113,6 +1113,7 @@ public final class HConstants {
    * handled by high priority handlers.
    */
   // normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS
+  public static final int PRIORITY_UNSET = -1;
   public static final int NORMAL_QOS = 0;
   public static final int REPLICATION_QOS = 5;
   public static final int REPLAY_QOS = 6;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
index a7709ee..848934c 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/client/TestRpcControllerFactory.java
@@ -28,6 +28,8 @@ import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.curator.shaded.com.google.common.collect.ConcurrentHashMultiset;
+import org.apache.curator.shaded.com.google.common.collect.Multiset;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.CellScanner;
@@ -76,6 +78,7 @@ public class TestRpcControllerFactory {
 
   public static class CountingRpcController extends DelegatingHBaseRpcController {
 
+    private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
     private static AtomicInteger INT_PRIORITY = new AtomicInteger();
     private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
 
@@ -85,8 +88,13 @@ public class TestRpcControllerFactory {
 
     @Override
     public void setPriority(int priority) {
+      int oldPriority = getPriority();
       super.setPriority(priority);
-      INT_PRIORITY.incrementAndGet();
+      int newPriority = getPriority();
+      if (newPriority != oldPriority) {
+        INT_PRIORITY.incrementAndGet();
+        GROUPED_PRIORITY.add(priority);
+      }
     }
 
     @Override
@@ -196,6 +204,14 @@ public class TestRpcControllerFactory {
     scanInfo.setSmall(false);
     counter = doScan(table, scanInfo, counter + 1);
 
+    // make sure we have no priority count
+    verifyPriorityGroupCount(HConstants.ADMIN_QOS, 0);
+    // lets set a custom priority on a get
+    Get get = new Get(row);
+    get.setPriority(HConstants.ADMIN_QOS);
+    table.get(get);
+    verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
+
     table.close();
     connection.close();
   }
@@ -208,11 +224,15 @@ public class TestRpcControllerFactory {
   }
 
   int verifyCount(Integer counter) {
-    assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter.intValue());
+    assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
     assertEquals(0, CountingRpcController.INT_PRIORITY.get());
     return CountingRpcController.TABLE_PRIORITY.get() + 1;
   }
 
+  void verifyPriorityGroupCount(int priorityLevel, int count) {
+    assertEquals(count, CountingRpcController.GROUPED_PRIORITY.count(priorityLevel));
+  }
+
   @Test
   public void testFallbackToDefaultRpcControllerFactory() {
     Configuration conf = new Configuration(UTIL.getConfiguration());

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
index 2c38662..0d5c993 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldSecureEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
 import org.apache.hadoop.hbase.TableName;
@@ -108,7 +109,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
       RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
       ClientServiceCallable<Void> callable =
           new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
-              rpcControllerFactory.newController()) {
+              rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
             @Override
             protected Void rpcCall() throws Exception {
               LOG.debug("Going to connect to server " + getLocation() + " for row " +
@@ -128,7 +129,7 @@ public class TestHRegionServerBulkLoadWithOldSecureEndpoint extends TestHRegionS
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
         callable = new ClientServiceCallable<Void>(conn, tableName, Bytes.toBytes("aaa"),
-            rpcControllerFactory.newController()) {
+            rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
           @Override
           protected Void rpcCall() throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 2ee2d7e..900861b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -155,6 +155,9 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
   public boolean dispatch(CallRunner callTask) throws InterruptedException {
     RpcCall call = callTask.getRpcCall();
     int level = priority.getPriority(call.getHeader(), call.getParam(), call.getRequestUser());
+    if (level == HConstants.PRIORITY_UNSET) {
+      level = HConstants.NORMAL_QOS;
+    }
     if (priorityExecutor != null && level > highPriorityLevel) {
       return priorityExecutor.dispatch(callTask);
     } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 4191aa8..7b4a353 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -530,7 +530,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     }
 
     return new ClientServiceCallable<byte[]>(conn,
-        tableName, first, rpcControllerFactory.newController()) {
+        tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
       @Override
       protected byte[] rpcCall() throws Exception {
         SecureBulkLoadClient secureClient = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
index f451207..c616a01 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
@@ -183,7 +183,7 @@ public class WALEditsReplaySink {
     ReplayServerCallable(final Connection connection, RpcControllerFactory rpcControllerFactory,
         final TableName tableName, final HRegionLocation regionLoc, final List<Entry> entries) {
       super(connection, tableName, HConstants.EMPTY_BYTE_ARRAY,
-          rpcControllerFactory.newController());
+          rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET);
       this.entries = entries;
       setLocation(regionLoc);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 037a538..1ef6c60 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -668,7 +668,7 @@ public class TestHCM {
     TEST_UTIL.createTable(tableName, FAM_NAM);
     ClientServiceCallable<Object> regionServerCallable = new ClientServiceCallable<Object>(
         TEST_UTIL.getConnection(), tableName, ROW,
-        new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController()) {
+        new RpcControllerFactory(TEST_UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
       @Override
       protected Object rpcCall() throws Exception {
         return null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 437afaf..898f629 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -475,7 +475,7 @@ public class TestReplicaWithCluster {
         new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
     ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
         hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0),
-        new RpcControllerFactory(HTU.getConfiguration()).newController()) {
+        new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
       @Override
       protected Void rpcCall() throws Exception {
         LOG.debug("Going to connect to server " + getLocation() + " for row "

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
index bf74a9e..8a6e19b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java
@@ -471,6 +471,7 @@ public class TestHeapSize  {
     expected = ClassSize.estimateBase(cl, false);
     //The actual TreeMap is not included in the above calculation
     expected += ClassSize.align(ClassSize.TREEMAP);
+    expected += ClassSize.align(ClassSize.INTEGER); // priority
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       assertEquals(expected, actual);
@@ -481,6 +482,7 @@ public class TestHeapSize  {
     expected  = ClassSize.estimateBase(cl, false);
     //The actual TreeMap is not included in the above calculation
     expected += ClassSize.align(ClassSize.TREEMAP);
+    expected += ClassSize.align(ClassSize.INTEGER); // priority
     if (expected != actual) {
       ClassSize.estimateBase(cl, true);
       assertEquals(expected, actual);

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 32ebbd2..e1aa137 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -354,7 +354,7 @@ public class TestLoadIncrementalHFilesSplitRecovery {
             HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
           ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
               conn, tableName, first, new RpcControllerFactory(
-                  util.getConfiguration()).newController()) {
+                  util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
             @Override
             public byte[] rpcCall() throws Exception {
               throw new IOException("Error calling something on RegionServer");

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
index 23a55e2..e52b139 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
@@ -451,7 +452,7 @@ public class TestSpaceQuotas {
     Table table = conn.getTable(tn);
     final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn);
     return new ClientServiceCallable<Void>(conn,
-        tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController()) {
+        tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(), HConstants.PRIORITY_UNSET) {
       @Override
       public Void rpcCall() throws Exception {
         SecureBulkLoadClient secureClient = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index c17234e..b5304f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
@@ -205,7 +206,7 @@ public class TestHRegionServerBulkLoad {
           prepareBulkLoad(conn);
       ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
           tableName, Bytes.toBytes("aaa"),
-          new RpcControllerFactory(UTIL.getConfiguration()).newController()) {
+          new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
         @Override
         public Void rpcCall() throws Exception {
           LOG.debug("Going to connect to server " + getLocation() + " for row "
@@ -229,7 +230,7 @@ public class TestHRegionServerBulkLoad {
         // 5 * 50 = 250 open file handles!
         callable = new ClientServiceCallable<Void>(conn,
             tableName, Bytes.toBytes("aaa"),
-            new RpcControllerFactory(UTIL.getConfiguration()).newController()) {
+            new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
           @Override
           protected Void rpcCall() throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "

http://git-wip-us.apache.org/repos/asf/hbase/blob/d461bec6/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
index 2a1655d..7f486e4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoadWithOldClient.java
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
 import org.apache.hadoop.hbase.TableName;
@@ -94,7 +95,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
       RpcControllerFactory rpcControllerFactory = new RpcControllerFactory(UTIL.getConfiguration());
       ClientServiceCallable<Void> callable =
           new ClientServiceCallable<Void>(conn, tableName,
-              Bytes.toBytes("aaa"), rpcControllerFactory.newController()) {
+              Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
         @Override
         protected Void rpcCall() throws Exception {
           LOG.info("Non-secure old client");
@@ -114,7 +115,7 @@ public class TestHRegionServerBulkLoadWithOldClient extends TestHRegionServerBul
       if (numBulkLoads.get() % 5 == 0) {
         // 5 * 50 = 250 open file handles!
         callable = new ClientServiceCallable<Void>(conn, tableName,
-            Bytes.toBytes("aaa"), rpcControllerFactory.newController()) {
+            Bytes.toBytes("aaa"), rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
           @Override
           protected Void rpcCall() throws Exception {
             LOG.debug("compacting " + getLocation() + " for row "


Mime
View raw message