hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject svn commit: r1523108 - in /hbase/trunk: hbase-common/src/main/java/org/apache/hadoop/hbase/ hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-server/src/main/java/org/apache/hadoop/hbase/master/ hbase-server/src/main/java/org/apache/hadoop/...
Date Fri, 13 Sep 2013 21:18:43 GMT
Author: stack
Date: Fri Sep 13 21:18:42 2013
New Revision: 1523108

URL: http://svn.apache.org/r1523108
Log:
HBASE-9101 Addendum to pluggable RpcScheduler

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
Removed:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/QosFunction.java
Modified:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java Fri Sep 13 21:18:42 2013
@@ -38,6 +38,8 @@ import org.apache.hadoop.hbase.util.Byte
 @InterfaceAudience.Public
 @InterfaceStability.Stable
 public final class HConstants {
+  // NOTICE!!!! Please do not add a constants here, unless they are referenced by a lot of classes.
+
   //Bytes.UTF8_ENCODING should be updated if this changed
   /** When we encode strings, we always specify UTF8 encoding */
   public static final String UTF8_ENCODING = "UTF-8";
@@ -893,8 +895,6 @@ public final class HConstants {
   public static final String STATUS_MULTICAST_PORT = "hbase.status.multicast.port";
   public static final int DEFAULT_STATUS_MULTICAST_PORT = 60100;
 
-
-
   private HConstants() {
     // Can't be instantiated with this ctor.
   }

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java?rev=1523108&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java Fri Sep 13 21:18:42 2013
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DaemonThreadFactory;
+
+import java.io.IOException;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A very simple {@code }RpcScheduler} that serves incoming requests in order.
+ *
+ * This can be used for HMaster, where no prioritization is needed.
+ */
+public class FifoRpcScheduler implements RpcScheduler {
+
+  private final int handlerCount;
+  private final int maxQueueLength;
+  private ThreadPoolExecutor executor;
+
+  public FifoRpcScheduler(Configuration conf, int handlerCount) {
+    this.handlerCount = handlerCount;
+    this.maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
+        handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
+  }
+
+  @Override
+  public void init(Context context) {
+    // no-op
+  }
+
+  @Override
+  public void start() {
+    this.executor = new ThreadPoolExecutor(
+        handlerCount,
+        handlerCount,
+        60,
+        TimeUnit.SECONDS,
+        new ArrayBlockingQueue<Runnable>(maxQueueLength),
+        new DaemonThreadFactory("FifoRpcScheduler.handler"),
+        new ThreadPoolExecutor.CallerRunsPolicy());
+  }
+
+  @Override
+  public void stop() {
+    this.executor.shutdown();
+  }
+
+  @Override
+  public void dispatch(RpcServer.CallRunner task) throws IOException, InterruptedException {
+    executor.submit(task);
+  }
+
+  @Override
+  public int getGeneralQueueLength() {
+    return executor.getQueue().size();
+  }
+
+  @Override
+  public int getPriorityQueueLength() {
+    return 0;
+  }
+
+  @Override
+  public int getReplicationQueueLength() {
+    return 0;
+  }
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java?rev=1523108&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PriorityFunction.java Fri Sep 13 21:18:42 2013
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.protobuf.Message;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+
+/**
+ * Function to figure priority of incoming request.
+ */
+public interface PriorityFunction {
+  /**
+   * @param header
+   * @param param
+   * @return Priority of this request.
+   */
+  int getPriority(RequestHeader header, Message param);
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java Fri Sep 13 21:18:42 2013
@@ -28,7 +28,7 @@ import java.net.InetSocketAddress;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-interface RpcScheduler {
+public interface RpcScheduler {
 
   /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
   interface Context {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java Fri Sep 13 21:18:42 2013
@@ -2165,6 +2165,7 @@ public class RpcServer implements RpcSer
     listener.interrupt();
     listener.doStop();
     responder.interrupt();
+    scheduler.stop();
     notifyAll();
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java Fri Sep 13 21:18:42 2013
@@ -17,20 +17,17 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
-import com.google.common.base.Function;
-import com.google.common.base.Strings;
-import com.google.common.collect.Lists;
-import com.google.protobuf.Message;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
-import org.apache.hadoop.hbase.util.Pair;
 
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.base.Strings;
+import com.google.common.collect.Lists;
 
 /**
  * A scheduler that maintains isolated handler pools for general, high-priority and replication
@@ -44,12 +41,12 @@ public class SimpleRpcScheduler implemen
   private final int handlerCount;
   private final int priorityHandlerCount;
   private final int replicationHandlerCount;
+  private final PriorityFunction priority;
   final BlockingQueue<RpcServer.CallRunner> callQueue;
   final BlockingQueue<RpcServer.CallRunner> priorityCallQueue;
   final BlockingQueue<RpcServer.CallRunner> replicationQueue;
   private volatile boolean running = false;
   private final List<Thread> handlers = Lists.newArrayList();
-  private final Function<Pair<RPCProtos.RequestHeader, Message>, Integer> qosFunction;
 
   /** What level a high priority call is at. */
   private final int highPriorityLevel;
@@ -59,22 +56,22 @@ public class SimpleRpcScheduler implemen
    * @param handlerCount the number of handler threads that will be used to process calls
    * @param priorityHandlerCount How many threads for priority handling.
    * @param replicationHandlerCount How many threads for replication handling.
-   * @param qosFunction a function that maps requests to priorities
    * @param highPriorityLevel
+   * @param priority Function to extract request priority.
    */
   public SimpleRpcScheduler(
       Configuration conf,
       int handlerCount,
       int priorityHandlerCount,
       int replicationHandlerCount,
-      Function<Pair<RPCProtos.RequestHeader, Message>, Integer> qosFunction,
+      PriorityFunction priority,
       int highPriorityLevel) {
     int maxQueueLength = conf.getInt("ipc.server.max.callqueue.length",
         handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
     this.handlerCount = handlerCount;
     this.priorityHandlerCount = priorityHandlerCount;
     this.replicationHandlerCount = replicationHandlerCount;
-    this.qosFunction = qosFunction;
+    this.priority = priority;
     this.highPriorityLevel = highPriorityLevel;
     this.callQueue = new LinkedBlockingQueue<RpcServer.CallRunner>(maxQueueLength);
     this.priorityCallQueue = priorityHandlerCount > 0
@@ -131,9 +128,7 @@ public class SimpleRpcScheduler implemen
   @Override
   public void dispatch(RpcServer.CallRunner callTask) throws InterruptedException {
     RpcServer.Call call = callTask.getCall();
-    Pair<RPCProtos.RequestHeader, Message> headerAndParam =
-        new Pair<RPCProtos.RequestHeader, Message>(call.header, call.param);
-    int level = getQosLevel(headerAndParam);
+    int level = priority.getPriority(call.header, call.param);
     if (priorityCallQueue != null && level > highPriorityLevel) {
       priorityCallQueue.put(callTask);
     } else if (replicationQueue != null && level == HConstants.REPLICATION_QOS) {
@@ -168,11 +163,5 @@ public class SimpleRpcScheduler implemen
       }
     }
   }
-
-  private int getQosLevel(Pair<RPCProtos.RequestHeader, Message> headerAndParam) {
-    if (qosFunction == null) return 0;
-    Integer res = qosFunction.apply(headerAndParam);
-    return res == null? 0: res;
-  }
 }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java Fri Sep 13 21:18:42 2013
@@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.exception
 import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
 import org.apache.hadoop.hbase.executor.ExecutorService;
 import org.apache.hadoop.hbase.executor.ExecutorType;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
 import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
 import org.apache.hadoop.hbase.ipc.RequestContext;
@@ -422,17 +424,10 @@ MasterServices, Server {
     HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
     int numHandlers = conf.getInt(HConstants.MASTER_HANDLER_COUNT,
       conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, HConstants.DEFAULT_MASTER_HANLDER_COUNT));
-    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
-        conf,
-        numHandlers,
-        0, // we don't use high priority handlers in master
-        0, // we don't use replication handlers in master
-        null, // this is a DNC w/o high priority handlers
-        0);
     this.rpcServer = new RpcServer(this, name, getServices(),
       initialIsa, // BindAddress is IP we got for this server.
       conf,
-      scheduler);
+      new FifoRpcScheduler(conf, numHandlers));
     // Set our address.
     this.isa = this.rpcServer.getListenerAddress();
     // We don't want to pass isa's hostname here since it could be 0.0.0.0

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java?rev=1523108&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AnnotationReadingPriorityFunction.java Fri Sep 13 21:18:42 2013
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.lang.reflect.Method;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CloseRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.CompactRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.FlushRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetStoreFileRequest;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.SplitRegionRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.regionserver.HRegionServer.QosPriority;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Message;
+import com.google.protobuf.TextFormat;
+
+
+/**
+ * Reads special method annotations and table names to figure a priority for use by QoS facility in
+ * ipc; e.g: rpcs to hbase:meta get priority.
+ */
+// TODO: Remove.  This is doing way too much work just to figure a priority.  Do as Elliott
+// suggests and just have the client specify a priority.
+
+//The logic for figuring out high priority RPCs is as follows:
+//1. if the method is annotated with a QosPriority of QOS_HIGH,
+//   that is honored
+//2. parse out the protobuf message and see if the request is for meta
+//   region, and if so, treat it as a high priority RPC
+//Some optimizations for (2) are done here -
+//Clients send the argument classname as part of making the RPC. The server
+//decides whether to deserialize the proto argument message based on the
+//pre-established set of argument classes (knownArgumentClasses below).
+//This prevents the server from having to deserialize all proto argument
+//messages prematurely.
+//All the argument classes declare a 'getRegion' method that returns a
+//RegionSpecifier object. Methods can be invoked on the returned object
+//to figure out whether it is a meta region or not.
+class AnnotationReadingPriorityFunction implements PriorityFunction {
+  public static final Log LOG =
+    LogFactory.getLog(AnnotationReadingPriorityFunction.class.getName());
+  private final Map<String, Integer> annotatedQos;
+  //We need to mock the regionserver instance for some unit tests (set via
+  //setRegionServer method.
+  private HRegionServer hRegionServer;
+  @SuppressWarnings("unchecked")
+  private final Class<? extends Message>[] knownArgumentClasses = new Class[]{
+      GetRegionInfoRequest.class,
+      GetStoreFileRequest.class,
+      CloseRegionRequest.class,
+      FlushRegionRequest.class,
+      SplitRegionRequest.class,
+      CompactRegionRequest.class,
+      GetRequest.class,
+      MutateRequest.class,
+      ScanRequest.class,
+      MultiRequest.class
+  };
+
+  // Some caches for helping performance
+  private final Map<String, Class<? extends Message>> argumentToClassMap =
+    new HashMap<String, Class<? extends Message>>();
+  private final Map<String, Map<Class<? extends Message>, Method>> methodMap =
+    new HashMap<String, Map<Class<? extends Message>, Method>>();
+
+  AnnotationReadingPriorityFunction(final HRegionServer hrs) {
+    this.hRegionServer = hrs;
+    Map<String, Integer> qosMap = new HashMap<String, Integer>();
+    for (Method m : HRegionServer.class.getMethods()) {
+      QosPriority p = m.getAnnotation(QosPriority.class);
+      if (p != null) {
+        // Since we protobuf'd, and then subsequently, when we went with pb style, method names
+        // are capitalized.  This meant that this brittle compare of method names gotten by
+        // reflection no longer matched the method names comeing in over pb.  TODO: Get rid of this
+        // check.  For now, workaround is to capitalize the names we got from reflection so they
+        // have chance of matching the pb ones.
+        String capitalizedMethodName = capitalize(m.getName());
+        qosMap.put(capitalizedMethodName, p.priority());
+      }
+    }
+    this.annotatedQos = qosMap;
+
+    if (methodMap.get("getRegion") == null) {
+      methodMap.put("hasRegion", new HashMap<Class<? extends Message>, Method>());
+      methodMap.put("getRegion", new HashMap<Class<? extends Message>, Method>());
+    }
+    for (Class<? extends Message> cls : knownArgumentClasses) {
+      argumentToClassMap.put(cls.getName(), cls);
+      try {
+        methodMap.get("hasRegion").put(cls, cls.getDeclaredMethod("hasRegion"));
+        methodMap.get("getRegion").put(cls, cls.getDeclaredMethod("getRegion"));
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private String capitalize(final String s) {
+    StringBuilder strBuilder = new StringBuilder(s);
+    strBuilder.setCharAt(0, Character.toUpperCase(strBuilder.charAt(0)));
+    return strBuilder.toString();
+  }
+
+  public boolean isMetaRegion(byte[] regionName) {
+    HRegion region;
+    try {
+      region = hRegionServer.getRegion(regionName);
+    } catch (NotServingRegionException ignored) {
+      return false;
+    }
+    return region.getRegionInfo().isMetaTable();
+  }
+
+  @Override
+  public int getPriority(RequestHeader header, Message param) {
+    String methodName = header.getMethodName();
+    Integer priorityByAnnotation = annotatedQos.get(methodName);
+    if (priorityByAnnotation != null) {
+      return priorityByAnnotation;
+    }
+
+    if (param == null) {
+      return HConstants.NORMAL_QOS;
+    }
+    String cls = param.getClass().getName();
+    Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
+    RegionSpecifier regionSpecifier = null;
+    //check whether the request has reference to meta region or now.
+    try {
+      // Check if the param has a region specifier; the pb methods are hasRegion and getRegion if
+      // hasRegion returns true.  Not all listed methods have region specifier each time.  For
+      // example, the ScanRequest has it on setup but thereafter relies on the scannerid rather than
+      // send the region over every time.
+      Method hasRegion = methodMap.get("hasRegion").get(rpcArgClass);
+      if (hasRegion != null && (Boolean)hasRegion.invoke(param, (Object[])null)) {
+        Method getRegion = methodMap.get("getRegion").get(rpcArgClass);
+        regionSpecifier = (RegionSpecifier)getRegion.invoke(param, (Object[])null);
+        HRegion region = hRegionServer.getRegion(regionSpecifier);
+        if (region.getRegionInfo().isMetaTable()) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("High priority because region=" + region.getRegionNameAsString());
+          }
+          return HConstants.HIGH_QOS;
+        }
+      }
+    } catch (Exception ex) {
+      // Not good throwing an exception out of here, a runtime anyways.  Let the query go into the
+      // server and have it throw the exception if still an issue.  Just mark it normal priority.
+      if (LOG.isTraceEnabled()) LOG.trace("Marking normal priority after getting exception=" + ex);
+      return HConstants.NORMAL_QOS;
+    }
+
+    if (methodName.equals("scan")) { // scanner methods...
+      ScanRequest request = (ScanRequest)param;
+      if (!request.hasScannerId()) {
+        return HConstants.NORMAL_QOS;
+      }
+      RegionScanner scanner = hRegionServer.getScanner(request.getScannerId());
+      if (scanner != null && scanner.getRegionInfo().isMetaRegion()) {
+        if (LOG.isTraceEnabled()) {
+          // Scanner requests are small in size so TextFormat version should not overwhelm log.
+          LOG.trace("High priority scanner request " + TextFormat.shortDebugString(request));
+        }
+        return HConstants.HIGH_QOS;
+      }
+    }
+    return HConstants.NORMAL_QOS;
+  }
+
+  @VisibleForTesting
+  void setRegionServer(final HRegionServer hrs) {
+    this.hRegionServer = hrs;
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Sep 13 21:18:42 2013
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import javax.management.ObjectName;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.lang.annotation.Retention;
@@ -47,8 +49,6 @@ import java.util.concurrent.ConcurrentMa
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.management.ObjectName;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -104,6 +104,7 @@ import org.apache.hadoop.hbase.fs.HFileS
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.RpcCallContext;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -111,7 +112,6 @@ import org.apache.hadoop.hbase.ipc.RpcSe
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
-import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
 import org.apache.hadoop.hbase.master.SplitLogManager;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -174,6 +174,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.NameStringPair;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.GetLastFlushedSequenceIdRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
@@ -256,6 +257,10 @@ public class HRegionServer implements Cl
   protected final ConcurrentMap<byte[], Boolean> regionsInTransitionInRS =
     new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR);
 
+  /** RPC scheduler to use for the region server. */
+  public static final String REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
+      "hbase.region.server.rpc.scheduler.factory.class";
+
   protected long maxScannerResultSize;
 
   // Cache flushing
@@ -466,9 +471,9 @@ public class HRegionServer implements Cl
   private final int scannerLeaseTimeoutPeriod;
 
   /**
-   * The reference to the QosFunction
+   * The reference to the priority extraction function
    */
-  private final QosFunction qosFunction;
+  private final PriorityFunction priority;
 
   private RegionServerCoprocessorHost rsHost;
 
@@ -552,22 +557,23 @@ public class HRegionServer implements Cl
     String name = "regionserver/" + initialIsa.toString();
     // Set how many times to retry talking to another server over HConnection.
     HConnectionManager.setServerSideHConnectionRetries(this.conf, name, LOG);
-    this.qosFunction = new QosFunction(this);
-    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
-        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
-    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
-        conf,
-        handlerCount,
-        conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
-            HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
-        conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
-            HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
-        qosFunction,
-        HConstants.QOS_THRESHOLD);
+    this.priority = new AnnotationReadingPriorityFunction(this);
+    RpcSchedulerFactory rpcSchedulerFactory;
+    try {
+      Class<?> rpcSchedulerFactoryClass = conf.getClass(
+          REGION_SERVER_RPC_SCHEDULER_FACTORY_CLASS,
+          SimpleRpcSchedulerFactory.class);
+      rpcSchedulerFactory = ((RpcSchedulerFactory) rpcSchedulerFactoryClass.newInstance());
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException(e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException(e);
+    }
     this.rpcServer = new RpcServer(this, name, getServices(),
       /*HBaseRPCErrorHandler.class, OnlineRegions.class},*/
       initialIsa, // BindAddress is IP we got for this server.
-      conf, scheduler);
+      conf,
+      rpcSchedulerFactory.create(conf, this));
 
     // Set our address.
     this.isa = this.rpcServer.getListenerAddress();
@@ -631,13 +637,18 @@ public class HRegionServer implements Cl
     return this.clusterId;
   }
 
+  @Override
+  public int getPriority(RequestHeader header, Message param) {
+    return priority.getPriority(header, param);
+  }
+
   @Retention(RetentionPolicy.RUNTIME)
   protected @interface QosPriority {
     int priority() default 0;
   }
 
-  QosFunction getQosFunction() {
-    return qosFunction;
+  PriorityFunction getPriority() {
+    return priority;
   }
 
   RegionScanner getScanner(long scannerId) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java Fri Sep 13 21:18:42 2013
@@ -18,25 +18,27 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentMap;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
 import org.apache.hadoop.hbase.executor.ExecutorService;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.zookeeper.KeeperException;
 
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
+
 /**
  * Services provided by {@link HRegionServer}
  */
 @InterfaceAudience.Private
-public interface RegionServerServices extends OnlineRegions, FavoredNodesForRegion {
+public interface RegionServerServices
+    extends OnlineRegions, FavoredNodesForRegion, PriorityFunction {
   /**
    * @return True if this regionserver is stopping.
    */

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java?rev=1523108&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RpcSchedulerFactory.java Fri Sep 13 21:18:42 2013
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+
+/**
+ * A factory class that constructs an {@link org.apache.hadoop.hbase.ipc.RpcScheduler} for
+ * a region server.
+ */
+public interface RpcSchedulerFactory {
+
+  /**
+   * Constructs a {@link org.apache.hadoop.hbase.ipc.RpcScheduler}.
+   *
+   * Please note that this method is called in constructor of {@link HRegionServer}, so some
+   * fields may not be ready for access. The reason that {@code HRegionServer} is passed as
+   * parameter here is that an RPC scheduler may need to access data structure inside
+   * {@code HRegionServer} (see example in {@link SimpleRpcSchedulerFactory}).
+   */
+  RpcScheduler create(Configuration conf, RegionServerServices server);
+}

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java?rev=1523108&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java Fri Sep 13 21:18:42 2013
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.RpcScheduler;
+import org.apache.hadoop.hbase.ipc.SimpleRpcScheduler;
+
+/** Constructs a {@link SimpleRpcScheduler}. for the region server. */
+class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
+
+  @Override
+  public RpcScheduler create(Configuration conf, RegionServerServices server) {
+    int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
+        HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
+    return new SimpleRpcScheduler(
+        conf,
+        handlerCount,
+        conf.getInt(HConstants.REGION_SERVER_META_HANDLER_COUNT,
+            HConstants.DEFAULT_REGION_SERVER_META_HANDLER_COUNT),
+        conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
+            HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
+        server,
+        HConstants.QOS_THRESHOLD);
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java Fri Sep 13 21:18:42 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import com.google.protobuf.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.catalog.CatalogTracker;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.fs.HFileS
 import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.master.TableLockManager;
 import org.apache.hadoop.hbase.master.TableLockManager.NullTableLockManager;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -214,4 +216,9 @@ class MockRegionServerServices implement
     // TODO Auto-generated method stub
     return null;
   }
+
+  @Override
+  public int getPriority(RPCProtos.RequestHeader header, Message param) {
+    return 0;
+  }
 }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestDelayedRpc.java Fri Sep 13 21:18:42 2013
@@ -86,7 +86,7 @@ public class TestDelayedRpc {
         Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
         isa,
         conf,
-        new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
+        new FifoRpcScheduler(conf, 1));
     rpcServer.start();
     RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
     try {
@@ -167,7 +167,7 @@ public class TestDelayedRpc {
       Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
         isa,
         conf,
-        new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
+        new FifoRpcScheduler(conf, 1));
     rpcServer.start();
     RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
     try {
@@ -289,7 +289,7 @@ public class TestDelayedRpc {
         Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
         isa,
         conf,
-        new SimpleRpcScheduler(conf, 1, 0, 0, null, 0));
+        new FifoRpcScheduler(conf, 1));
     rpcServer.start();
     RpcClient rpcClient = new RpcClient(conf, HConstants.DEFAULT_CLUSTER_ID.toString());
     try {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java Fri Sep 13 21:18:42 2013
@@ -21,18 +21,26 @@ package org.apache.hadoop.hbase.ipc;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.net.SocketFactory;
 
+import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -64,6 +72,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Matchers;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -139,9 +148,13 @@ public class TestIPC {
   private static class TestRpcServer extends RpcServer {
 
     TestRpcServer() throws IOException {
+      this(new FifoRpcScheduler(CONF, 1));
+    }
+
+    TestRpcServer(RpcScheduler scheduler) throws IOException {
       super(null, "testRpcServer",
           Lists.newArrayList(new BlockingServiceAndInterface(SERVICE, null)),
-        new InetSocketAddress("0.0.0.0", 0), CONF, new SimpleRpcScheduler(CONF, 1, 1, 0, null, 0));
+        new InetSocketAddress("0.0.0.0", 0), CONF, scheduler);
     }
 
     @Override
@@ -257,6 +270,29 @@ public class TestIPC {
     }
   }
 
+  /** Tests that the rpc scheduler is called when requests arrive. */
+  @Test
+  public void testRpcScheduler() throws IOException, InterruptedException {
+    RpcScheduler scheduler = spy(new FifoRpcScheduler(CONF, 1));
+    RpcServer rpcServer = new TestRpcServer(scheduler);
+    verify(scheduler).init((RpcScheduler.Context) anyObject());
+    RpcClient client = new RpcClient(CONF, HConstants.CLUSTER_ID_DEFAULT);
+    try {
+      rpcServer.start();
+      verify(scheduler).start();
+      MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
+      EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
+      for (int i = 0; i < 10; i++) {
+        client.call(md, param, CellUtil.createCellScanner(ImmutableList.of(CELL)),
+            md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(), 0);
+      }
+      verify(scheduler, times(10)).dispatch((RpcServer.CallRunner) anyObject());
+    } finally {
+      rpcServer.stop();
+      verify(scheduler).stop();
+    }
+  }
+
   public static void main(String[] args)
   throws IOException, SecurityException, NoSuchMethodException, InterruptedException {
     if (args.length != 2) {
@@ -285,7 +321,8 @@ public class TestIPC {
       for (int i = 0; i < cycles; i++) {
         List<CellScannable> cells = new ArrayList<CellScannable>();
         // Message param = RequestConverter.buildMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm);
-        Message param = RequestConverter.buildNoDataMultiRequest(HConstants.EMPTY_BYTE_ARRAY, rm, cells);
+        Message param = RequestConverter.buildNoDataMultiRequest(
+            HConstants.EMPTY_BYTE_ARRAY, rm, cells);
         CellScanner cellScanner = CellUtil.createCellScanner(cells);
         if (i % 1000 == 0) {
           LOG.info("" + i);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestProtoBufRpc.java Fri Sep 13 21:18:42 2013
@@ -99,7 +99,7 @@ public class TestProtoBufRpc {
     this.server = new RpcServer(null, "testrpc",
         Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(service, null)),
         new InetSocketAddress(ADDRESS, PORT), conf,
-        new SimpleRpcScheduler(conf, 10, 10, 0, null, 0));
+        new FifoRpcScheduler(conf, 10));
     this.isa = server.getListenerAddress();
     this.server.start();
   }

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java?rev=1523108&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java Fri Sep 13 21:18:42 2013
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.protobuf.Message;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.ipc.RpcServer.Call;
+import org.apache.hadoop.hbase.ipc.RpcServer.CallRunner;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@Category(SmallTests.class)
+public class TestSimpleRpcScheduler {
+
+  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
+    @Override
+    public InetSocketAddress getListenerAddress() {
+      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
+    }
+  };
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+  }
+
+  @Test
+  public void testBasic() throws IOException, InterruptedException {
+    PriorityFunction qosFunction = mock(PriorityFunction.class);
+    RpcScheduler scheduler = new SimpleRpcScheduler(
+        conf, 10, 0, 0, qosFunction, 0);
+    scheduler.init(CONTEXT);
+    scheduler.start();
+    CallRunner task = createMockTask();
+    scheduler.dispatch(task);
+    verify(task, timeout(1000)).run();
+    scheduler.stop();
+  }
+
+  @Test
+  public void testHandlerIsolation() throws IOException, InterruptedException {
+    CallRunner generalTask = createMockTask();
+    CallRunner priorityTask = createMockTask();
+    CallRunner replicationTask = createMockTask();
+    List<CallRunner> tasks = ImmutableList.of(
+        generalTask,
+        priorityTask,
+        replicationTask);
+    Map<CallRunner, Integer> qos = ImmutableMap.of(
+        generalTask, 0,
+        priorityTask, HConstants.HIGH_QOS + 1,
+        replicationTask, HConstants.REPLICATION_QOS);
+    PriorityFunction qosFunction = mock(PriorityFunction.class);
+    final Map<CallRunner, Thread> handlerThreads = Maps.newHashMap();
+    Answer<Void> answerToRun = new Answer<Void>() {
+      @Override
+      public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+        handlerThreads.put(
+            (CallRunner) invocationOnMock.getMock(),
+            Thread.currentThread());
+        return null;
+      }
+    };
+    for (CallRunner task : tasks) {
+      doAnswer(answerToRun).when(task).run();
+    }
+
+    RpcScheduler scheduler = new SimpleRpcScheduler(
+        conf, 1, 1 ,1, qosFunction, HConstants.HIGH_QOS);
+    scheduler.init(CONTEXT);
+    scheduler.start();
+    for (CallRunner task : tasks) {
+      when(qosFunction.getPriority((RPCProtos.RequestHeader) anyObject(), (Message) anyObject()))
+          .thenReturn(qos.get(task));
+      scheduler.dispatch(task);
+    }
+    for (CallRunner task : tasks) {
+      verify(task, timeout(1000)).run();
+    }
+    scheduler.stop();
+
+    // Tests that these requests are handled by three distinct threads.
+    assertEquals(3, ImmutableSet.copyOf(handlerThreads.values()).size());
+  }
+
+  private CallRunner createMockTask() {
+    Call call = mock(Call.class);
+    CallRunner task = mock(CallRunner.class);
+    when(task.getCall()).thenReturn(call);
+    return task;
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java Fri Sep 13 21:18:42 2013
@@ -27,6 +27,7 @@ import java.util.Random;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import com.google.protobuf.Message;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.CellScannable;
@@ -86,6 +87,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
+import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.regionserver.CompactionRequestor;
 import org.apache.hadoop.hbase.regionserver.FlushRequester;
 import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -558,6 +560,11 @@ ClientProtos.ClientService.BlockingInter
   }
 
   @Override
+  public int getPriority(RPCProtos.RequestHeader header, Message param) {
+    return 0;
+  }
+
+  @Override
   public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
       UpdateFavoredNodesRequest request) throws ServiceException {
     return null;

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityRpc.java Fri Sep 13 21:18:42 2013
@@ -19,7 +19,8 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 
@@ -28,42 +29,41 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Get;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
-import org.apache.hadoop.hbase.util.Pair;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.mockito.Mockito;
 
 import com.google.protobuf.ByteString;
-import com.google.protobuf.Message;
 /**
  * Tests that verify certain RPCs get a higher QoS.
  */
 @Category(MediumTests.class)
 public class TestPriorityRpc {
   private HRegionServer regionServer = null;
-  private QosFunction qosFunction = null;
+  private PriorityFunction priority = null;
 
   @Before
   public void setup() {
     Configuration conf = HBaseConfiguration.create();
     regionServer = HRegionServer.constructRegionServer(HRegionServer.class, conf);
-    qosFunction = regionServer.getQosFunction();
+    priority = regionServer.getPriority();
   }
 
   @Test
   public void testQosFunctionForMeta() throws IOException {
-    qosFunction = regionServer.getQosFunction();
+    priority = regionServer.getPriority();
     RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
     //create a rpc request that has references to hbase:meta region and also
     //uses one of the known argument classes (known argument classes are
-    //listed in HRegionServer.QosFunction.knownArgumentClasses)
+    //listed in HRegionServer.QosFunctionImpl.knownArgumentClasses)
     headerBuilder.setMethodName("foo");
 
     GetRequest.Builder getRequestBuilder = GetRequest.newBuilder();
@@ -84,9 +84,9 @@ public class TestPriorityRpc {
     Mockito.when(mockRS.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
     Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
     Mockito.when(mockRegionInfo.isMetaTable()).thenReturn(true);
-    qosFunction.setRegionServer(mockRS);
-    assertTrue (qosFunction.apply(new Pair<RequestHeader, Message>(header, getRequest)) ==
-      HConstants.HIGH_QOS);
+    // Presume type.
+    ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
+    assertEquals(HConstants.HIGH_QOS, priority.getPriority(header, getRequest));
   }
 
   @Test
@@ -94,13 +94,12 @@ public class TestPriorityRpc {
     //The request is not using any of the
     //known argument classes (it uses one random request class)
     //(known argument classes are listed in
-    //HRegionServer.QosFunction.knownArgumentClasses)
+    //HRegionServer.QosFunctionImpl.knownArgumentClasses)
     RequestHeader.Builder headerBuilder = RequestHeader.newBuilder();
     headerBuilder.setMethodName("foo");
     RequestHeader header = headerBuilder.build();
-    QosFunction qosFunc = regionServer.getQosFunction();
-    assertTrue (qosFunc.apply(new Pair<RequestHeader, Message>(header, null)) ==
-      HConstants.NORMAL_QOS);
+    PriorityFunction qosFunc = regionServer.getPriority();
+    assertEquals(HConstants.NORMAL_QOS, qosFunc.getPriority(header, null));
   }
 
   @Test
@@ -118,8 +117,9 @@ public class TestPriorityRpc {
     Mockito.when(mockRS.getRegion((RegionSpecifier)Mockito.any())).thenReturn(mockRegion);
     Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
     Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
-    qosFunction.setRegionServer(mockRS);
-    int qos = qosFunction.apply(new Pair<RequestHeader, Message>(header, scanRequest));
+    // Presume type.
+    ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
+    int qos = priority.getPriority(header, scanRequest);
     assertTrue ("" + qos, qos == HConstants.NORMAL_QOS);
 
     //build a scan request with scannerID
@@ -134,14 +134,13 @@ public class TestPriorityRpc {
     Mockito.when(mockRegion.getRegionInfo()).thenReturn(mockRegionInfo);
     Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(true);
 
-    qosFunction.setRegionServer(mockRS);
+    // Presume type.
+    ((AnnotationReadingPriorityFunction)priority).setRegionServer(mockRS);
 
-    assertTrue (qosFunction.apply(new Pair<RequestHeader, Message>(header, scanRequest)) ==
-      HConstants.HIGH_QOS);
+    assertEquals(HConstants.HIGH_QOS, priority.getPriority(header, scanRequest));
 
     //the same as above but with non-meta region
     Mockito.when(mockRegionInfo.isMetaRegion()).thenReturn(false);
-    assertTrue (qosFunction.apply(new Pair<RequestHeader, Message>(header, scanRequest)) ==
-      HConstants.NORMAL_QOS);
+    assertEquals(HConstants.NORMAL_QOS, priority.getPriority(header, scanRequest));
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQosFunction.java Fri Sep 13 21:18:42 2013
@@ -37,7 +37,7 @@ public class TestQosFunction {
   @Test
   public void testPriority() {
     HRegionServer hrs = Mockito.mock(HRegionServer.class);
-    QosFunction qosFunction = new QosFunction(hrs);
+    AnnotationReadingPriorityFunction qosFunction = new AnnotationReadingPriorityFunction(hrs);
 
     // Set method name in pb style with the method name capitalized.
     checkMethod("ReplicateWALEntry", HConstants.REPLICATION_QOS, qosFunction);
@@ -45,11 +45,9 @@ public class TestQosFunction {
     checkMethod("OpenRegion", HConstants.HIGH_QOS, qosFunction);
   }
 
-  private void checkMethod(final String methodName, final int expected, final QosFunction qosf) {
+  private void checkMethod(final String methodName, final int expected, final AnnotationReadingPriorityFunction qosf) {
     RequestHeader.Builder builder = RequestHeader.newBuilder();
     builder.setMethodName(methodName);
-    Pair<RequestHeader, Message> headerAndParam =
-      new Pair<RequestHeader, Message>(builder.build(), null);
-    assertEquals(methodName, expected, qosf.apply(headerAndParam).intValue());
+    assertEquals(methodName, expected, qosf.getPriority(builder.build(), null));
   }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java?rev=1523108&r1=1523107&r2=1523108&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestTokenAuthentication.java Fri Sep 13 21:18:42 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.catalog.C
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
+import org.apache.hadoop.hbase.ipc.FifoRpcScheduler;
 import org.apache.hadoop.hbase.ipc.RequestContext;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcServer;
@@ -130,10 +131,8 @@ public class TestTokenAuthentication {
         AuthenticationProtos.AuthenticationService.newReflectiveBlockingService(this);
       sai.add(new BlockingServiceAndInterface(service,
         AuthenticationProtos.AuthenticationService.BlockingInterface.class));
-      SimpleRpcScheduler scheduler = new SimpleRpcScheduler(
-          conf, 3, 1, 0, null, HConstants.QOS_THRESHOLD);
       this.rpcServer =
-        new RpcServer(this, "tokenServer", sai, initialIsa, conf, scheduler);
+        new RpcServer(this, "tokenServer", sai, initialIsa, conf, new FifoRpcScheduler(conf, 1));
       this.isa = this.rpcServer.getListenerAddress();
       this.sleeper = new Sleeper(1000, this);
     }



Mime
View raw message