hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From x...@apache.org
Subject [2/2] hadoop git commit: HADOOP-12916. Allow RPC scheduler/callqueue backoff using response times. Contributed by Xiaoyu Yao.
Date Thu, 31 Mar 2016 15:48:12 GMT
HADOOP-12916. Allow RPC scheduler/callqueue backoff using response times. Contributed by Xiaoyu Yao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d95c6eb3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d95c6eb3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d95c6eb3

Branch: refs/heads/trunk
Commit: d95c6eb32cec7768ac418fb467b1198ccf3cf0dc
Parents: 0a74610
Author: Xiaoyu Yao <xyao@apache.org>
Authored: Thu Mar 31 08:42:57 2016 -0700
Committer: Xiaoyu Yao <xyao@apache.org>
Committed: Thu Mar 31 08:42:57 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/conf/Configuration.java   |  13 +
 .../hadoop/fs/CommonConfigurationKeys.java      |  14 +-
 .../org/apache/hadoop/ipc/CallQueueManager.java | 124 +++++-
 .../apache/hadoop/ipc/DecayRpcScheduler.java    | 396 +++++++++++++++----
 .../hadoop/ipc/DecayRpcSchedulerMXBean.java     |   2 +
 .../apache/hadoop/ipc/DefaultRpcScheduler.java  |  45 +++
 .../org/apache/hadoop/ipc/FairCallQueue.java    |  45 +--
 .../apache/hadoop/ipc/ProtobufRpcEngine.java    |   8 +-
 .../org/apache/hadoop/ipc/RpcScheduler.java     |   8 +-
 .../java/org/apache/hadoop/ipc/Schedulable.java |   5 +-
 .../main/java/org/apache/hadoop/ipc/Server.java |  77 +++-
 .../apache/hadoop/ipc/WritableRpcEngine.java    |  45 +--
 .../apache/hadoop/ipc/TestCallQueueManager.java | 147 ++++++-
 .../hadoop/ipc/TestDecayRpcScheduler.java       |  42 +-
 .../apache/hadoop/ipc/TestFairCallQueue.java    |  79 ++--
 .../hadoop/ipc/TestIdentityProviders.java       |  18 +-
 .../java/org/apache/hadoop/ipc/TestRPC.java     |  92 ++++-
 17 files changed, 893 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
index 8355d96..4c8f27b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java
@@ -1626,6 +1626,10 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
       return defaultValue;
     }
     vStr = vStr.trim();
+    return getTimeDurationHelper(name, vStr, unit);
+  }
+
+  private long getTimeDurationHelper(String name, String vStr, TimeUnit unit) {
     ParsedTimeDuration vUnit = ParsedTimeDuration.unitFor(vStr);
     if (null == vUnit) {
       LOG.warn("No unit for " + name + "(" + vStr + ") assuming " + unit);
@@ -1636,6 +1640,15 @@ public class Configuration implements Iterable<Map.Entry<String,String>>,
     return unit.convert(Long.parseLong(vStr), vUnit.unit());
   }
 
+  public long[] getTimeDurations(String name, TimeUnit unit) {
+    String[] strings = getTrimmedStrings(name);
+    long[] durations = new long[strings.length];
+    for (int i = 0; i < strings.length; i++) {
+      durations[i] = getTimeDurationHelper(name, strings[i], unit);
+    }
+    return durations;
+  }
+
   /**
    * Get the value of the <code>name</code> property as a <code>Pattern</code>.
    * If no such property is specified, or if the specified value is not a valid

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 9b4069a..a708900 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -90,14 +90,22 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   /**
    * CallQueue related settings. These are not used directly, but rather
    * combined with a namespace and port. For instance:
-   * IPC_CALLQUEUE_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
+   * IPC_NAMESPACE + ".8020." + IPC_CALLQUEUE_IMPL_KEY
    */
-  public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
+  public static final String IPC_NAMESPACE = "ipc";
   public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
-  public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
+  public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl";
+  public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
   public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
   public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
 
+  /**
+   * IPC scheduler priority levels.
+   */
+  public static final String IPC_SCHEDULER_PRIORITY_LEVELS_KEY =
+      "scheduler.priority.levels";
+  public static final int IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY = 4;
+
   /** This is for specifying the implementation for the mappings from
    * hostnames to the racks they belong to
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
index 2ee15d3..1a7782a 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 
 /**
  * Abstracts queue operations for different blocking queues.
@@ -43,6 +44,13 @@ public class CallQueueManager<E> {
       Class<?> queueClass, Class<E> elementClass) {
     return (Class<? extends BlockingQueue<E>>)queueClass;
   }
+
+  @SuppressWarnings("unchecked")
+  static Class<? extends RpcScheduler> convertSchedulerClass(
+      Class<?> schedulerClass) {
+    return (Class<? extends RpcScheduler>)schedulerClass;
+  }
+
   private final boolean clientBackOffEnabled;
 
   // Atomic refs point to active callQueue
@@ -50,25 +58,76 @@ public class CallQueueManager<E> {
   private final AtomicReference<BlockingQueue<E>> putRef;
   private final AtomicReference<BlockingQueue<E>> takeRef;
 
+  private RpcScheduler scheduler;
+
   public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
+                          Class<? extends RpcScheduler> schedulerClass,
       boolean clientBackOffEnabled, int maxQueueSize, String namespace,
       Configuration conf) {
+    int priorityLevels = parseNumLevels(namespace, conf);
+    this.scheduler = createScheduler(schedulerClass, priorityLevels,
+        namespace, conf);
     BlockingQueue<E> bq = createCallQueueInstance(backingClass,
-      maxQueueSize, namespace, conf);
+        priorityLevels, maxQueueSize, namespace, conf);
     this.clientBackOffEnabled = clientBackOffEnabled;
     this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
     this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
-    LOG.info("Using callQueue " + backingClass);
+    LOG.info("Using callQueue: " + backingClass + " scheduler: " +
+        schedulerClass);
+  }
+
+  private static <T extends RpcScheduler> T createScheduler(
+      Class<T> theClass, int priorityLevels, String ns, Configuration conf) {
+    // Used for custom, configurable scheduler
+    try {
+      Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
+          String.class, Configuration.class);
+      return ctor.newInstance(priorityLevels, ns, conf);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(theClass.getName()
+          + " could not be constructed.", e.getCause());
+    } catch (Exception e) {
+    }
+
+    try {
+      Constructor<T> ctor = theClass.getDeclaredConstructor(int.class);
+      return ctor.newInstance(priorityLevels);
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(theClass.getName()
+          + " could not be constructed.", e.getCause());
+    } catch (Exception e) {
+    }
+
+    // Last attempt
+    try {
+      Constructor<T> ctor = theClass.getDeclaredConstructor();
+      return ctor.newInstance();
+    } catch (RuntimeException e) {
+      throw e;
+    } catch (InvocationTargetException e) {
+      throw new RuntimeException(theClass.getName()
+          + " could not be constructed.", e.getCause());
+    } catch (Exception e) {
+    }
+
+    // Nothing worked
+    throw new RuntimeException(theClass.getName() +
+        " could not be constructed.");
   }
 
   private <T extends BlockingQueue<E>> T createCallQueueInstance(
-      Class<T> theClass, int maxLen, String ns, Configuration conf) {
+      Class<T> theClass, int priorityLevels, int maxLen, String ns,
+      Configuration conf) {
 
     // Used for custom, configurable callqueues
     try {
-      Constructor<T> ctor = theClass.getDeclaredConstructor(int.class, String.class,
-        Configuration.class);
-      return ctor.newInstance(maxLen, ns, conf);
+      Constructor<T> ctor = theClass.getDeclaredConstructor(int.class,
+          int.class, String.class, Configuration.class);
+      return ctor.newInstance(priorityLevels, maxLen, ns, conf);
     } catch (RuntimeException e) {
       throw e;
     } catch (InvocationTargetException e) {
@@ -110,6 +169,22 @@ public class CallQueueManager<E> {
     return clientBackOffEnabled;
   }
 
+  // Based on policy to determine back off current call
+  boolean shouldBackOff(Schedulable e) {
+    return scheduler.shouldBackOff(e);
+  }
+
+  void addResponseTime(String name, int priorityLevel, int queueTime,
+      int processingTime) {
+    scheduler.addResponseTime(name, priorityLevel, queueTime, processingTime);
+  }
+
+  // This should be only called once per call and cached in the call object
+  // each getPriorityLevel call will increment the counter for the caller
+  int getPriorityLevel(Schedulable e) {
+    return scheduler.getPriorityLevel(e);
+  }
+
   /**
    * Insert e into the backing queue or block until we can.
    * If we block and the queue changes on us, we will insert while the
@@ -147,14 +222,45 @@ public class CallQueueManager<E> {
   }
 
   /**
+   * Read the number of levels from the configuration.
+   * This will affect the FairCallQueue's overall capacity.
+   * @throws IllegalArgumentException on invalid queue count
+   */
+  @SuppressWarnings("deprecation")
+  private static int parseNumLevels(String ns, Configuration conf) {
+    // Fair call queue levels (IPC_CALLQUEUE_PRIORITY_LEVELS_KEY)
+    // takes priority over the scheduler level key
+    // (IPC_SCHEDULER_PRIORITY_LEVELS_KEY)
+    int retval = conf.getInt(ns + "." +
+        FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 0);
+    if (retval == 0) { // No FCQ priority level configured
+      retval = conf.getInt(ns + "." +
+          CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY,
+          CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_DEFAULT_KEY);
+    } else {
+      LOG.warn(ns + "." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY +
+          " is deprecated. Please use " + ns + "." +
+          CommonConfigurationKeys.IPC_SCHEDULER_PRIORITY_LEVELS_KEY + ".");
+    }
+    if(retval < 1) {
+      throw new IllegalArgumentException("numLevels must be at least 1");
+    }
+    return retval;
+  }
+
+  /**
    * Replaces active queue with the newly requested one and transfers
    * all calls to the newQ before returning.
    */
   public synchronized void swapQueue(
+      Class<? extends RpcScheduler> schedulerClass,
       Class<? extends BlockingQueue<E>> queueClassToUse, int maxSize,
       String ns, Configuration conf) {
-    BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse, maxSize,
-      ns, conf);
+    int priorityLevels = parseNumLevels(ns, conf);
+    RpcScheduler newScheduler = createScheduler(schedulerClass, priorityLevels,
+        ns, conf);
+    BlockingQueue<E> newQ = createCallQueueInstance(queueClassToUse,
+        priorityLevels, maxSize, ns, conf);
 
     // Our current queue becomes the old queue
     BlockingQueue<E> oldQ = putRef.get();
@@ -168,6 +274,8 @@ public class CallQueueManager<E> {
     // Swap takeRef to handle new calls
     takeRef.set(newQ);
 
+    this.scheduler = newScheduler;
+
     LOG.info("Old Queue: " + stringRepr(oldQ) + ", " +
       "Replacement: " + stringRepr(newQ));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
index a6a14d0..4237339 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java
@@ -27,17 +27,21 @@ import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicLongArray;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.util.concurrent.AtomicDoubleArray;
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.metrics2.util.MBeans;
 
 import org.codehaus.jackson.map.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The decay RPC scheduler counts incoming requests in a map, then
@@ -49,22 +53,28 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
   /**
    * Period controls how many milliseconds between each decay sweep.
    */
-  public static final String IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY =
+  public static final String IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY =
+      "decay-scheduler.period-ms";
+  public static final long IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT =
+      5000;
+  @Deprecated
+  public static final String IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY =
     "faircallqueue.decay-scheduler.period-ms";
-  public static final long   IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT =
-    5000L;
 
   /**
    * Decay factor controls how much each count is suppressed by on each sweep.
    * Valid numbers are > 0 and < 1. Decay factor works in tandem with period
    * to control how long the scheduler remembers an identity.
    */
-  public static final String IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY =
+  public static final String IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY =
+      "decay-scheduler.decay-factor";
+  public static final double IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT =
+      0.5;
+  @Deprecated
+  public static final String IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY =
     "faircallqueue.decay-scheduler.decay-factor";
-  public static final double IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT =
-    0.5;
 
-  /**
+ /**
    * Thresholds are specified as integer percentages, and specify which usage
    * range each queue will be allocated to. For instance, specifying the list
    *  10, 40, 80
@@ -74,15 +84,31 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
    * - q1 from 10 up to 40
    * - q0 otherwise.
    */
-  public static final String IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY =
-    "faircallqueue.decay-scheduler.thresholds";
+  public static final String IPC_DECAYSCHEDULER_THRESHOLDS_KEY =
+      "decay-scheduler.thresholds";
+  @Deprecated
+  public static final String IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY =
+      "faircallqueue.decay-scheduler.thresholds";
 
   // Specifies the identity to use when the IdentityProvider cannot handle
   // a schedulable.
   public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY =
-    "IdentityProvider.Unknown";
+      "IdentityProvider.Unknown";
+
+  public static final String
+      IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY =
+      "decay-scheduler.backoff.responsetime.enable";
+  public static final Boolean
+      IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT = false;
+
+  // Specifies the average response time (ms) thresholds of each
+  // level to trigger backoff
+  public static final String
+      IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY =
+      "decay-scheduler.backoff.responsetime.thresholds";
 
-  public static final Log LOG = LogFactory.getLog(DecayRpcScheduler.class);
+  public static final Logger LOG =
+      LoggerFactory.getLogger(DecayRpcScheduler.class);
 
   // Track the number of calls for each schedulable identity
   private final ConcurrentHashMap<Object, AtomicLong> callCounts =
@@ -91,6 +117,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
   // Should be the sum of all AtomicLongs in callCounts
   private final AtomicLong totalCalls = new AtomicLong();
 
+  // Track total call count and response time in current decay window
+  private final AtomicLongArray responseTimeCountInCurrWindow;
+  private final AtomicLongArray responseTimeTotalInCurrWindow;
+
+  // Track average response time in previous decay window
+  private final AtomicDoubleArray responseTimeAvgInLastWindow;
+  private final AtomicLongArray responseTimeCountInLastWindow;
+
   // Pre-computed scheduling decisions during the decay sweep are
   // atomically swapped in as a read-only map
   private final AtomicReference<Map<Object, Integer>> scheduleCacheRef =
@@ -98,10 +132,12 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
 
   // Tune the behavior of the scheduler
   private final long decayPeriodMillis; // How long between each tick
-  private final double decayFactor; // nextCount = currentCount / decayFactor
-  private final int numQueues; // affects scheduling decisions, from 0 to numQueues - 1
+  private final double decayFactor; // nextCount = currentCount * decayFactor
+  private final int numLevels;
   private final double[] thresholds;
   private final IdentityProvider identityProvider;
+  private final boolean backOffByResponseTimeEnabled;
+  private final long[] backOffResponseTimeThresholds;
 
   /**
    * This TimerTask will call decayCurrentCounts until
@@ -132,35 +168,46 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
 
   /**
    * Create a decay scheduler.
-   * @param numQueues number of queues to schedule for
+   * @param numLevels number of priority levels
    * @param ns config prefix, so that we can configure multiple schedulers
    *           in a single instance.
    * @param conf configuration to use.
    */
-  public DecayRpcScheduler(int numQueues, String ns, Configuration conf) {
-    if (numQueues < 1) {
-      throw new IllegalArgumentException("number of queues must be > 0");
+  public DecayRpcScheduler(int numLevels, String ns, Configuration conf) {
+    if(numLevels < 1) {
+      throw new IllegalArgumentException("Number of Priority Levels must be " +
+          "at least 1");
     }
-
-    this.numQueues = numQueues;
+    this.numLevels = numLevels;
     this.decayFactor = parseDecayFactor(ns, conf);
     this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
     this.identityProvider = this.parseIdentityProvider(ns, conf);
-    this.thresholds = parseThresholds(ns, conf, numQueues);
+    this.thresholds = parseThresholds(ns, conf, numLevels);
+    this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
+        conf);
+    this.backOffResponseTimeThresholds =
+        parseBackOffResponseTimeThreshold(ns, conf, numLevels);
 
     // Setup delay timer
     Timer timer = new Timer();
     DecayTask task = new DecayTask(this, timer);
     timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
 
-    MetricsProxy prox = MetricsProxy.getInstance(ns);
+    // Setup response time metrics
+    responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels);
+    responseTimeCountInCurrWindow = new AtomicLongArray(numLevels);
+    responseTimeAvgInLastWindow = new AtomicDoubleArray(numLevels);
+    responseTimeCountInLastWindow = new AtomicLongArray(numLevels);
+
+    MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels);
     prox.setDelegate(this);
   }
 
   // Load configs
-  private IdentityProvider parseIdentityProvider(String ns, Configuration conf) {
+  private IdentityProvider parseIdentityProvider(String ns,
+      Configuration conf) {
     List<IdentityProvider> providers = conf.getInstances(
-      ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
+      ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
       IdentityProvider.class);
 
     if (providers.size() < 1) {
@@ -174,10 +221,16 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
 
   private static double parseDecayFactor(String ns, Configuration conf) {
     double factor = conf.getDouble(ns + "." +
-        IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY,
-      IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT
-    );
-
+        IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, 0.0);
+    if (factor == 0.0) {
+      factor = conf.getDouble(ns + "." +
+          IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY,
+          IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT);
+    } else if ((factor > 0.0) && (factor < 1)) {
+      LOG.warn(IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY +
+          " is deprecated. Please use " +
+          IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY + ".");
+    }
     if (factor <= 0 || factor >= 1) {
       throw new IllegalArgumentException("Decay Factor " +
         "must be between 0 and 1");
@@ -188,10 +241,17 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
 
   private static long parseDecayPeriodMillis(String ns, Configuration conf) {
     long period = conf.getLong(ns + "." +
-        IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY,
-      IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT
-    );
-
+        IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
+        0);
+    if (period == 0) {
+      period = conf.getLong(ns + "." +
+          IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY,
+          IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT);
+    } else if (period > 0) {
+      LOG.warn((IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY +
+          " is deprecated. Please use " +
+          IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY));
+    }
     if (period <= 0) {
       throw new IllegalArgumentException("Period millis must be >= 0");
     }
@@ -200,15 +260,24 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
   }
 
   private static double[] parseThresholds(String ns, Configuration conf,
-      int numQueues) {
+      int numLevels) {
     int[] percentages = conf.getInts(ns + "." +
-      IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY);
+        IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY);
 
     if (percentages.length == 0) {
-      return getDefaultThresholds(numQueues);
-    } else if (percentages.length != numQueues-1) {
+      percentages = conf.getInts(ns + "." + IPC_DECAYSCHEDULER_THRESHOLDS_KEY);
+      if (percentages.length == 0) {
+        return getDefaultThresholds(numLevels);
+      }
+    } else {
+      LOG.warn(IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY +
+          " is deprecated. Please use " +
+          IPC_DECAYSCHEDULER_THRESHOLDS_KEY);
+    }
+
+    if (percentages.length != numLevels-1) {
       throw new IllegalArgumentException("Number of thresholds should be " +
-        (numQueues-1) + ". Was: " + percentages.length);
+        (numLevels-1) + ". Was: " + percentages.length);
     }
 
     // Convert integer percentages to decimals
@@ -223,14 +292,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
   /**
    * Generate default thresholds if user did not specify. Strategy is
    * to halve each time, since queue usage tends to be exponential.
-   * So if numQueues is 4, we would generate: double[]{0.125, 0.25, 0.5}
+   * So if numLevels is 4, we would generate: double[]{0.125, 0.25, 0.5}
    * which specifies the boundaries between each queue's usage.
-   * @param numQueues number of queues to compute for
-   * @return array of boundaries of length numQueues - 1
+   * @param numLevels number of levels to compute for
+   * @return array of boundaries of length numLevels - 1
    */
-  private static double[] getDefaultThresholds(int numQueues) {
-    double[] ret = new double[numQueues - 1];
-    double div = Math.pow(2, numQueues - 1);
+  private static double[] getDefaultThresholds(int numLevels) {
+    double[] ret = new double[numLevels - 1];
+    double div = Math.pow(2, numLevels - 1);
 
     for (int i = 0; i < ret.length; i++) {
       ret[i] = Math.pow(2, i)/div;
@@ -238,39 +307,89 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
     return ret;
   }
 
+  private static long[] parseBackOffResponseTimeThreshold(String ns,
+      Configuration conf, int numLevels) {
+    long[] responseTimeThresholds = conf.getTimeDurations(ns + "." +
+            IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY,
+        TimeUnit.MILLISECONDS);
+    // backoff thresholds not specified
+    if (responseTimeThresholds.length == 0) {
+      return getDefaultBackOffResponseTimeThresholds(numLevels);
+    }
+    // backoff thresholds specified but not match with the levels
+    if (responseTimeThresholds.length != numLevels) {
+      throw new IllegalArgumentException(
+          "responseTimeThresholds must match with the number of priority " +
+          "levels");
+    }
+    // invalid thresholds
+    for (long responseTimeThreshold: responseTimeThresholds) {
+      if (responseTimeThreshold <= 0) {
+        throw new IllegalArgumentException(
+            "responseTimeThreshold millis must be >= 0");
+      }
+    }
+    return responseTimeThresholds;
+  }
+
+  // 10s for level 0, 20s for level 1, 30s for level 2, ...
+  private static long[] getDefaultBackOffResponseTimeThresholds(int numLevels) {
+    long[] ret = new long[numLevels];
+    for (int i = 0; i < ret.length; i++) {
+      ret[i] = 10000*(i+1);
+    }
+    return ret;
+  }
+
+  private static Boolean parseBackOffByResponseTimeEnabled(String ns,
+      Configuration conf) {
+    return conf.getBoolean(ns + "." +
+        IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
+        IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT);
+  }
+
   /**
    * Decay the stored counts for each user and clean as necessary.
    * This method should be called periodically in order to keep
    * counts current.
    */
   private void decayCurrentCounts() {
-    long total = 0;
-    Iterator<Map.Entry<Object, AtomicLong>> it =
-      callCounts.entrySet().iterator();
-
-    while (it.hasNext()) {
-      Map.Entry<Object, AtomicLong> entry = it.next();
-      AtomicLong count = entry.getValue();
-
-      // Compute the next value by reducing it by the decayFactor
-      long currentValue = count.get();
-      long nextValue = (long)(currentValue * decayFactor);
-      total += nextValue;
-      count.set(nextValue);
-
-      if (nextValue == 0) {
-        // We will clean up unused keys here. An interesting optimization might
-        // be to have an upper bound on keyspace in callCounts and only
-        // clean once we pass it.
-        it.remove();
+    try {
+      long total = 0;
+      Iterator<Map.Entry<Object, AtomicLong>> it =
+          callCounts.entrySet().iterator();
+
+      while (it.hasNext()) {
+        Map.Entry<Object, AtomicLong> entry = it.next();
+        AtomicLong count = entry.getValue();
+
+        // Compute the next value by reducing it by the decayFactor
+        long currentValue = count.get();
+        long nextValue = (long) (currentValue * decayFactor);
+        total += nextValue;
+        count.set(nextValue);
+
+        if (nextValue == 0) {
+          // We will clean up unused keys here. An interesting optimization
+          // might be to have an upper bound on keyspace in callCounts and only
+          // clean once we pass it.
+          it.remove();
+        }
       }
-    }
 
-    // Update the total so that we remain in sync
-    totalCalls.set(total);
+      // Update the total so that we remain in sync
+      totalCalls.set(total);
+
+      // Now refresh the cache of scheduling decisions
+      recomputeScheduleCache();
 
-    // Now refresh the cache of scheduling decisions
-    recomputeScheduleCache();
+      // Update average response time with decay
+      updateAverageResponseTime(true);
+    } catch (Exception ex) {
+      LOG.error("decayCurrentCounts exception: " +
+          ExceptionUtils.getFullStackTrace(ex));
+      throw ex;
+    }
   }
 
   /**
@@ -324,7 +443,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
   /**
    * Given the number of occurrences, compute a scheduling decision.
    * @param occurrences how many occurrences
-   * @return scheduling decision from 0 to numQueues - 1
+   * @return scheduling decision from 0 to numLevels - 1
    */
   private int computePriorityLevel(long occurrences) {
     long totalCallSnapshot = totalCalls.get();
@@ -334,14 +453,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
       proportion = (double) occurrences / totalCallSnapshot;
     }
 
-    // Start with low priority queues, since they will be most common
-    for(int i = (numQueues - 1); i > 0; i--) {
+    // Start with low priority levels, since they will be most common
+    for(int i = (numLevels - 1); i > 0; i--) {
       if (proportion >= this.thresholds[i - 1]) {
-        return i; // We've found our queue number
+        return i; // We've found our level number
       }
     }
 
-    // If we get this far, we're at queue 0
+    // If we get this far, we're at level 0
     return 0;
   }
 
@@ -349,7 +468,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
    * Returns the priority level for a given identity by first trying the cache,
    * then computing it.
    * @param identity an object responding to toString and hashCode
-   * @return integer scheduling decision from 0 to numQueues - 1
+   * @return integer scheduling decision from 0 to numLevels - 1
    */
   private int cachedOrComputedPriorityLevel(Object identity) {
     try {
@@ -360,22 +479,29 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
       if (scheduleCache != null) {
         Integer priority = scheduleCache.get(identity);
         if (priority != null) {
+          LOG.debug("Cache priority for: {} with priority: {}", identity,
+              priority);
           return priority;
         }
       }
 
       // Cache was no good, compute it
-      return computePriorityLevel(occurrences);
+      int priority = computePriorityLevel(occurrences);
+      LOG.debug("compute priority for " + identity + " priority " + priority);
+      return priority;
+
     } catch (InterruptedException ie) {
-      LOG.warn("Caught InterruptedException, returning low priority queue");
-      return numQueues - 1;
+      LOG.warn("Caught InterruptedException, returning low priority level");
+      LOG.debug("Fallback priority for: {} with priority: {}", identity,
+          numLevels - 1);
+      return numLevels - 1;
     }
   }
 
   /**
    * Compute the appropriate priority for a schedulable based on past requests.
    * @param obj the schedulable obj to query and remember
-   * @return the queue index which we recommend scheduling in
+   * @return the level index which we recommend scheduling in
    */
   @Override
   public int getPriorityLevel(Schedulable obj) {
@@ -389,6 +515,73 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
     return cachedOrComputedPriorityLevel(identity);
   }
 
+  @Override
+  public boolean shouldBackOff(Schedulable obj) {
+    Boolean backOff = false;
+    if (backOffByResponseTimeEnabled) {
+      int priorityLevel = obj.getPriorityLevel();
+      if (LOG.isDebugEnabled()) {
+        double[] responseTimes = getAverageResponseTime();
+        LOG.debug("Current Caller: {}  Priority: {} ",
+            obj.getUserGroupInformation().getUserName(),
+            obj.getPriorityLevel());
+        for (int i = 0; i < numLevels; i++) {
+          LOG.debug("Queue: {} responseTime: {} backoffThreshold: {}", i,
+              responseTimes[i], backOffResponseTimeThresholds[i]);
+        }
+      }
+      // High priority rpc over threshold triggers back off of low priority rpc
+      for (int i = 0; i < priorityLevel + 1; i++) {
+        if (responseTimeAvgInLastWindow.get(i) >
+            backOffResponseTimeThresholds[i]) {
+          backOff = true;
+          break;
+        }
+      }
+    }
+    return backOff;
+  }
+
+  @Override
+  public void addResponseTime(String name, int priorityLevel, int queueTime,
+      int processingTime) {
+    responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
+    responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
+        queueTime+processingTime);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("addResponseTime for call: {}  priority: {} queueTime: {} " +
+          "processingTime: {} ", name, priorityLevel, queueTime,
+          processingTime);
+    }
+  }
+
+  // Update the cached average response time at the end of decay window
+  void updateAverageResponseTime(boolean enableDecay) {
+    for (int i = 0; i < numLevels; i++) {
+      double averageResponseTime = 0;
+      long totalResponseTime = responseTimeTotalInCurrWindow.get(i);
+      long responseTimeCount = responseTimeCountInCurrWindow.get(i);
+      if (responseTimeCount > 0) {
+        averageResponseTime = (double) totalResponseTime / responseTimeCount;
+      }
+      final double lastAvg = responseTimeAvgInLastWindow.get(i);
+      if (enableDecay && lastAvg > 0.0) {
+        final double decayed = decayFactor * lastAvg + averageResponseTime;
+        responseTimeAvgInLastWindow.set(i, decayed);
+      } else {
+        responseTimeAvgInLastWindow.set(i, averageResponseTime);
+      }
+      responseTimeCountInLastWindow.set(i, responseTimeCount);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("updateAverageResponseTime queue: {} Average: {} Count: {}",
+            i, averageResponseTime, responseTimeCount);
+      }
+      // Reset for next decay window
+      responseTimeTotalInCurrWindow.set(i, 0);
+      responseTimeCountInCurrWindow.set(i, 0);
+    }
+  }
+
   // For testing
   @VisibleForTesting
   public double getDecayFactor() { return decayFactor; }
@@ -429,16 +622,21 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
 
     // Weakref for delegate, so we don't retain it forever if it can be GC'd
     private WeakReference<DecayRpcScheduler> delegate;
+    private double[] averageResponseTimeDefault;
+    private long[] callCountInLastWindowDefault;
 
-    private MetricsProxy(String namespace) {
+    private MetricsProxy(String namespace, int numLevels) {
+      averageResponseTimeDefault = new double[numLevels];
+      callCountInLastWindowDefault = new long[numLevels];
       MBeans.register(namespace, "DecayRpcScheduler", this);
     }
 
-    public static synchronized MetricsProxy getInstance(String namespace) {
+    public static synchronized MetricsProxy getInstance(String namespace,
+        int numLevels) {
       MetricsProxy mp = INSTANCES.get(namespace);
       if (mp == null) {
         // We must create one
-        mp = new MetricsProxy(namespace);
+        mp = new MetricsProxy(namespace, numLevels);
         INSTANCES.put(namespace, mp);
       }
       return mp;
@@ -487,6 +685,25 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
         return scheduler.getTotalCallVolume();
       }
     }
+
+    @Override
+    public double[] getAverageResponseTime() {
+      DecayRpcScheduler scheduler = delegate.get();
+      if (scheduler == null) {
+        return averageResponseTimeDefault;
+      } else {
+        return scheduler.getAverageResponseTime();
+      }
+    }
+
+    public long[] getResponseTimeCountInLastWindow() {
+      DecayRpcScheduler scheduler = delegate.get();
+      if (scheduler == null) {
+        return callCountInLastWindowDefault;
+      } else {
+        return scheduler.getResponseTimeCountInLastWindow();
+      }
+    }
   }
 
   public int getUniqueIdentityCount() {
@@ -497,6 +714,23 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
     return totalCalls.get();
   }
 
+  public long[] getResponseTimeCountInLastWindow() {
+    long[] ret = new long[responseTimeCountInLastWindow.length()];
+    for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
+      ret[i] = responseTimeCountInLastWindow.get(i);
+    }
+    return ret;
+  }
+
+  @Override
+  public double[] getAverageResponseTime() {
+    double[] ret = new double[responseTimeAvgInLastWindow.length()];
+    for (int i = 0; i < responseTimeAvgInLastWindow.length(); i++) {
+      ret[i] = responseTimeAvgInLastWindow.get(i);
+    }
+    return ret;
+  }
+
   public String getSchedulingDecisionSummary() {
     Map<Object, Integer> decisions = scheduleCacheRef.get();
     if (decisions == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java
index 3481f19..fab9b93 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcSchedulerMXBean.java
@@ -27,4 +27,6 @@ public interface DecayRpcSchedulerMXBean {
   String getCallVolumeSummary();
   int getUniqueIdentityCount();
   long getTotalCallVolume();
+  double[] getAverageResponseTime();
+  long[] getResponseTimeCountInLastWindow();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
new file mode 100644
index 0000000..08f74d4
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultRpcScheduler.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * No op default RPC scheduler.
+ */
+public class DefaultRpcScheduler implements RpcScheduler {
+  @Override
+  public int getPriorityLevel(Schedulable obj) {
+    return 0;
+  }
+
+  @Override
+  public boolean shouldBackOff(Schedulable obj) {
+    return false;
+  }
+
+  @Override
+  public void addResponseTime(String name, int priorityLevel, int queueTime,
+      int processingTime) {
+  }
+
+  public DefaultRpcScheduler(int priorityLevels, String namespace,
+      Configuration conf) {
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
index 0b56243..435c454 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java
@@ -44,8 +44,9 @@ import org.apache.hadoop.metrics2.util.MBeans;
  */
 public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   implements BlockingQueue<E> {
-  // Configuration Keys
+  @Deprecated
   public static final int    IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4;
+  @Deprecated
   public static final String IPC_CALLQUEUE_PRIORITY_LEVELS_KEY =
     "faircallqueue.priority-levels";
 
@@ -66,9 +67,6 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
     }
   }
 
-  /* Scheduler picks which queue to place in */
-  private RpcScheduler scheduler;
-
   /* Multiplexer picks which queue to draw from */
   private RpcMultiplexer multiplexer;
 
@@ -83,8 +81,13 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    * Notes: the FairCallQueue has no fixed capacity. Rather, it has a minimum
    * capacity of `capacity` and a maximum capacity of `capacity * number_queues`
    */
-  public FairCallQueue(int capacity, String ns, Configuration conf) {
-    int numQueues = parseNumQueues(ns, conf);
+  public FairCallQueue(int priorityLevels, int capacity, String ns,
+      Configuration conf) {
+    if(priorityLevels < 1) {
+      throw new IllegalArgumentException("Number of Priority Levels must be " +
+          "at least 1");
+    }
+    int numQueues = priorityLevels;
     LOG.info("FairCallQueue is in use with " + numQueues + " queues.");
 
     this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
@@ -95,29 +98,13 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
       this.overflowedCalls.add(new AtomicLong(0));
     }
 
-    this.scheduler = new DecayRpcScheduler(numQueues, ns, conf);
     this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf);
-
     // Make this the active source of metrics
     MetricsProxy mp = MetricsProxy.getInstance(ns);
     mp.setDelegate(this);
   }
 
   /**
-   * Read the number of queues from the configuration.
-   * This will affect the FairCallQueue's overall capacity.
-   * @throws IllegalArgumentException on invalid queue count
-   */
-  private static int parseNumQueues(String ns, Configuration conf) {
-    int retval = conf.getInt(ns + "." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY,
-      IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT);
-    if(retval < 1) {
-      throw new IllegalArgumentException("numQueues must be at least 1");
-    }
-    return retval;
-  }
-
-  /**
    * Returns the first non-empty queue with equal or lesser priority
    * than <i>startIdx</i>. Wraps around, searching a maximum of N
    * queues, where N is this.queues.size().
@@ -144,7 +131,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
 
   /**
    * Put and offer follow the same pattern:
-   * 1. Get a priorityLevel from the scheduler
+   * 1. Get the assigned priorityLevel from the call by scheduler
    * 2. Get the nth sub-queue matching this priorityLevel
    * 3. delegate the call to this sub-queue.
    *
@@ -154,7 +141,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
    */
   @Override
   public void put(E e) throws InterruptedException {
-    int priorityLevel = scheduler.getPriorityLevel(e);
+    int priorityLevel = e.getPriorityLevel();
 
     final int numLevels = this.queues.size();
     while (true) {
@@ -185,7 +172,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   @Override
   public boolean offer(E e, long timeout, TimeUnit unit)
       throws InterruptedException {
-    int priorityLevel = scheduler.getPriorityLevel(e);
+    int priorityLevel = e.getPriorityLevel();
     BlockingQueue<E> q = this.queues.get(priorityLevel);
     boolean ret = q.offer(e, timeout, unit);
 
@@ -196,7 +183,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
 
   @Override
   public boolean offer(E e) {
-    int priorityLevel = scheduler.getPriorityLevel(e);
+    int priorityLevel = e.getPriorityLevel();
     BlockingQueue<E> q = this.queues.get(priorityLevel);
     boolean ret = q.offer(e);
 
@@ -436,12 +423,6 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
     return calls;
   }
 
-  // For testing
-  @VisibleForTesting
-  public void setScheduler(RpcScheduler newScheduler) {
-    this.scheduler = newScheduler;
-  }
-
   @VisibleForTesting
   public void setMultiplexer(RpcMultiplexer newMux) {
     this.multiplexer = newMux;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
index 692d2b6..071e2e8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java
@@ -654,13 +654,7 @@ public class ProtobufRpcEngine implements RpcEngine {
           String detailedMetricsName = (exception == null) ?
               methodName :
               exception.getClass().getSimpleName();
-          server.rpcMetrics.addRpcQueueTime(qTime);
-          server.rpcMetrics.addRpcProcessingTime(processingTime);
-          server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
-              processingTime);
-          if (server.isLogSlowRPC()) {
-            server.logSlowRpcCalls(methodName, processingTime);
-          }
+          server.updateMetrics(detailedMetricsName, qTime, processingTime);
         }
         return new RpcResponseWrapper(result);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
index a155706..6f93b22 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java
@@ -19,11 +19,17 @@
 package org.apache.hadoop.ipc;
 
 /**
- * Implement this interface to be used for RPC scheduling in the fair call queues.
+ * Implement this interface to be used for RPC scheduling and backoff.
+ *
  */
 public interface RpcScheduler {
   /**
    * Returns priority level greater than zero as a hint for scheduling.
    */
   int getPriorityLevel(Schedulable obj);
+
+  boolean shouldBackOff(Schedulable obj);
+
+  void addResponseTime(String name, int priorityLevel, int queueTime,
+      int processingTime);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java
index 38f3518..3b28d85 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Schedulable.java
@@ -18,11 +18,8 @@
 
 package org.apache.hadoop.ipc;
 
-import java.nio.ByteBuffer;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.io.Writable;
 
 /**
  * Interface which allows extracting information necessary to
@@ -31,4 +28,6 @@ import org.apache.hadoop.io.Writable;
 @InterfaceAudience.Private
 public interface Schedulable {
   public UserGroupInformation getUserGroupInformation();
+
+  int getPriorityLevel();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 1d92865..eb28ad5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -396,6 +396,15 @@ public abstract class Server {
     return CurCall.get() != null;
   }
 
+  /**
+   * Return the priority level assigned by call queue to an RPC
+   * Returns 0 in case no priority is assigned.
+   */
+  public static int getPriorityLevel() {
+    Call call = CurCall.get();
+    return call != null? call.getPriorityLevel() : 0;
+  }
+
   private String bindAddress; 
   private int port;                               // port we listen on
   private int handlerCount;                       // number of handler threads
@@ -482,6 +491,18 @@ public abstract class Server {
     }
   }
 
+  void updateMetrics(String name, int queueTime, int processingTime) {
+    rpcMetrics.addRpcQueueTime(queueTime);
+    rpcMetrics.addRpcProcessingTime(processingTime);
+    rpcDetailedMetrics.addProcessingTime(name, processingTime);
+    callQueue.addResponseTime(name, getPriorityLevel(), queueTime,
+        processingTime);
+
+    if (isLogSlowRPC()) {
+      logSlowRpcCalls(name, processingTime);
+    }
+  }
+
   /**
    * A convenience method to bind to a given address and report 
    * better exceptions if the address is not a valid host.
@@ -578,6 +599,10 @@ public abstract class Server {
     return serviceAuthorizationManager;
   }
 
+  private String getQueueClassPrefix() {
+    return CommonConfigurationKeys.IPC_NAMESPACE + "." + port;
+  }
+
   static Class<? extends BlockingQueue<Call>> getQueueClass(
       String prefix, Configuration conf) {
     String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
@@ -585,8 +610,29 @@ public abstract class Server {
     return CallQueueManager.convertQueueClass(queueClass, Call.class);
   }
 
-  private String getQueueClassPrefix() {
-    return CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE + "." + port;
+  static Class<? extends RpcScheduler> getSchedulerClass(
+      String prefix, Configuration conf) {
+    String schedulerKeyname = prefix + "." + CommonConfigurationKeys
+        .IPC_SCHEDULER_IMPL_KEY;
+    Class<?> schedulerClass = conf.getClass(schedulerKeyname, null);
+    // Patch the configuration for legacy fcq configuration that does not have
+    // a separate scheduler setting
+    if (schedulerClass == null) {
+      String queueKeyName = prefix + "." + CommonConfigurationKeys
+          .IPC_CALLQUEUE_IMPL_KEY;
+      Class<?> queueClass = conf.getClass(queueKeyName, null);
+      if (queueClass != null) {
+        if (queueClass.getCanonicalName().equals(
+            FairCallQueue.class.getCanonicalName())) {
+          conf.setClass(schedulerKeyname, DecayRpcScheduler.class,
+              RpcScheduler.class);
+        }
+      }
+    }
+    schedulerClass = conf.getClass(schedulerKeyname,
+        DefaultRpcScheduler.class);
+
+    return CallQueueManager.convertSchedulerClass(schedulerClass);
   }
 
   /*
@@ -595,7 +641,8 @@ public abstract class Server {
   public synchronized void refreshCallQueue(Configuration conf) {
     // Create the next queue
     String prefix = getQueueClassPrefix();
-    callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
+    callQueue.swapQueue(getSchedulerClass(prefix, conf),
+        getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
   }
 
   /**
@@ -623,6 +670,8 @@ public abstract class Server {
     private final byte[] clientId;
     private final TraceScope traceScope; // the HTrace scope on the server side
     private final CallerContext callerContext; // the call context
+    private int priorityLevel;
+    // the priority level assigned by scheduler, 0 by default
 
     private Call(Call call) {
       this(call.callId, call.retryCount, call.rpcRequest, call.connection,
@@ -709,7 +758,16 @@ public abstract class Server {
     @Override
     public UserGroupInformation getUserGroupInformation() {
       return connection.user;
-    }    
+    }
+
+    @Override
+    public int getPriorityLevel() {
+      return this.priorityLevel;
+    }
+
+    public void setPriorityLevel(int priorityLevel) {
+      this.priorityLevel = priorityLevel;
+    }
   }
 
   /** Listens on the socket. Creates jobs for the handler threads*/
@@ -2151,6 +2209,9 @@ public abstract class Server {
           rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
           header.getClientId().toByteArray(), traceScope, callerContext);
 
+      // Save the priority level assignment by the scheduler
+      call.setPriorityLevel(callQueue.getPriorityLevel(call));
+
       if (callQueue.isClientBackoffEnabled()) {
         // if RPC queue is full, we will ask the RPC client to back off by
         // throwing RetriableException. Whether RPC client will honor
@@ -2166,9 +2227,10 @@ public abstract class Server {
 
     private void queueRequestOrAskClientToBackOff(Call call)
         throws WrappedRpcServerException, InterruptedException {
-      // If rpc queue is full, we will ask the client to back off.
-      boolean isCallQueued = callQueue.offer(call);
-      if (!isCallQueued) {
+      // If rpc scheduler indicates back off based on performance
+      // degradation such as response time or rpc queue is full,
+      // we will ask the client to back off.
+      if (callQueue.shouldBackOff(call) || !callQueue.offer(call)) {
         rpcMetrics.incrClientBackoff();
         RetriableException retriableException =
             new RetriableException("Server is too busy.");
@@ -2513,6 +2575,7 @@ public abstract class Server {
     // Setup appropriate callqueue
     final String prefix = getQueueClassPrefix();
     this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
+        getSchedulerClass(prefix, conf),
         getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
 
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
index a1db6be..a9dbb41 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.io.*;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
-import org.apache.hadoop.ipc.VersionedProtocol;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -502,13 +501,12 @@ public class WritableRpcEngine implements RpcEngine {
             }
           }
         }
-          
 
-          // Invoke the protocol method
-       long startTime = Time.now();
-       int qTime = (int) (startTime-receivedTime);
-       Exception exception = null;
-       try {
+        // Invoke the protocol method
+        long startTime = Time.now();
+        int qTime = (int) (startTime-receivedTime);
+        Exception exception = null;
+        try {
           Method method =
               protocolImpl.protocolClass.getMethod(call.getMethodName(),
               call.getParameterClasses());
@@ -539,27 +537,20 @@ public class WritableRpcEngine implements RpcEngine {
           exception = ioe;
           throw ioe;
         } finally {
-         int processingTime = (int) (Time.now() - startTime);
-         if (LOG.isDebugEnabled()) {
-           String msg = "Served: " + call.getMethodName() +
-               " queueTime= " + qTime +
-               " procesingTime= " + processingTime;
-           if (exception != null) {
-             msg += " exception= " + exception.getClass().getSimpleName();
-           }
-           LOG.debug(msg);
-         }
-         String detailedMetricsName = (exception == null) ?
-             call.getMethodName() :
-             exception.getClass().getSimpleName();
-         server.rpcMetrics.addRpcQueueTime(qTime);
-         server.rpcMetrics.addRpcProcessingTime(processingTime);
-         server.rpcDetailedMetrics.addProcessingTime(detailedMetricsName,
-             processingTime);
-          if (server.isLogSlowRPC()) {
-            server.logSlowRpcCalls(call.getMethodName(), processingTime);
+          int processingTime = (int) (Time.now() - startTime);
+          if (LOG.isDebugEnabled()) {
+            String msg = "Served: " + call.getMethodName() +
+                " queueTime= " + qTime + " procesingTime= " + processingTime;
+            if (exception != null) {
+              msg += " exception= " + exception.getClass().getSimpleName();
+            }
+            LOG.debug(msg);
           }
-       }
+          String detailedMetricsName = (exception == null) ?
+              call.getMethodName() :
+              exception.getClass().getSimpleName();
+          server.updateMetrics(detailedMetricsName, qTime, processingTime);
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
index 4d659ac..af9ce1b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java
@@ -27,17 +27,37 @@ import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.Test;
 
 public class TestCallQueueManager {
   private CallQueueManager<FakeCall> manager;
+  private Configuration conf = new Configuration();
 
-  public class FakeCall {
+  public class FakeCall implements Schedulable {
     public final int tag; // Can be used for unique identification
-
+    private int priorityLevel;
+    UserGroupInformation fakeUgi = UserGroupInformation.createRemoteUser
+        ("fakeUser");
     public FakeCall(int tag) {
       this.tag = tag;
     }
+
+    @Override
+    public UserGroupInformation getUserGroupInformation() {
+      return fakeUgi;
+    }
+
+    @Override
+    public int getPriorityLevel() {
+      return priorityLevel;
+    }
+
+    public void setPriorityLevel(int level) {
+      this.priorityLevel = level;
+    }
   }
 
   /**
@@ -62,7 +82,9 @@ public class TestCallQueueManager {
       try {
         // Fill up to max (which is infinite if maxCalls < 0)
         while (isRunning && (callsAdded < maxCalls || maxCalls < 0)) {
-          cq.put(new FakeCall(this.tag));
+          FakeCall call = new FakeCall(this.tag);
+          call.setPriorityLevel(cq.getPriorityLevel(call));
+          cq.put(call);
           callsAdded++;
         }
       } catch (InterruptedException e) {
@@ -135,7 +157,7 @@ public class TestCallQueueManager {
     t.start();
     t.join(100);
 
-    assertEquals(putter.callsAdded, numberOfPuts);
+    assertEquals(numberOfPuts, putter.callsAdded);
     t.interrupt();
   }
 
@@ -143,23 +165,90 @@ public class TestCallQueueManager {
   private static final Class<? extends BlockingQueue<FakeCall>> queueClass
       = CallQueueManager.convertQueueClass(LinkedBlockingQueue.class, FakeCall.class);
 
+  private static final Class<? extends RpcScheduler> schedulerClass
+      = CallQueueManager.convertSchedulerClass(DefaultRpcScheduler.class);
+
   @Test
   public void testCallQueueCapacity() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
+        10, "", conf);
 
     assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
   }
 
   @Test
   public void testEmptyConsume() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
+        10, "", conf);
 
     assertCanTake(manager, 0, 1); // Fails since it's empty
   }
 
+  static Class<? extends BlockingQueue<FakeCall>> getQueueClass(
+      String prefix, Configuration conf) {
+    String name = prefix + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY;
+    Class<?> queueClass = conf.getClass(name, LinkedBlockingQueue.class);
+    return CallQueueManager.convertQueueClass(queueClass, FakeCall.class);
+  }
+
+  @Test
+  public void testFcqBackwardCompatibility() throws InterruptedException {
+    // Test BackwardCompatibility to ensure existing FCQ deployment still
+    // work without explicitly specifying DecayRpcScheduler
+    Configuration conf = new Configuration();
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+
+    final String queueClassName = "org.apache.hadoop.ipc.FairCallQueue";
+    conf.setStrings(ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IMPL_KEY,
+        queueClassName);
+
+    // Specify only Fair Call Queue without a scheduler
+    // Ensure the DecayScheduler will be added to avoid breaking.
+    Class<? extends RpcScheduler> scheduler = Server.getSchedulerClass(ns,
+        conf);
+    assertTrue(scheduler.getCanonicalName().
+        equals("org.apache.hadoop.ipc.DecayRpcScheduler"));
+
+    Class<? extends BlockingQueue<FakeCall>> queue =
+        (Class<? extends BlockingQueue<FakeCall>>) getQueueClass(ns, conf);
+    assertTrue(queue.getCanonicalName().equals(queueClassName));
+
+    manager = new CallQueueManager<FakeCall>(queue, scheduler, false,
+        2, "", conf);
+
+    // Default FCQ has 4 levels and the max capacity is 2 x 4
+    assertCanPut(manager, 3, 3);
+  }
+
+  @Test
+  public void testSchedulerWithoutFCQ() throws InterruptedException {
+    Configuration conf = new Configuration();
+    // Test DecayedRpcScheduler without FCQ
+    // Ensure the default LinkedBlockingQueue can work with DecayedRpcScheduler
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+    final String schedulerClassName = "org.apache.hadoop.ipc.DecayRpcScheduler";
+    conf.setStrings(ns + "." + CommonConfigurationKeys.IPC_SCHEDULER_IMPL_KEY,
+        schedulerClassName);
+
+    Class<? extends BlockingQueue<FakeCall>> queue =
+        (Class<? extends BlockingQueue<FakeCall>>) getQueueClass(ns, conf);
+    assertTrue(queue.getCanonicalName().equals("java.util.concurrent." +
+        "LinkedBlockingQueue"));
+
+    manager = new CallQueueManager<FakeCall>(queue,
+        Server.getSchedulerClass(ns, conf), false,
+        3, "", conf);
+
+    // LinkedBlockingQueue with a capacity of 3 can put 3 calls
+    assertCanPut(manager, 3, 3);
+    // LinkedBlockingQueue with a capacity of 3 can't put 1 more call
+    assertCanPut(manager, 0, 1);
+  }
+
   @Test(timeout=60000)
   public void testSwapUnderContention() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
+        5000, "", conf);
 
     ArrayList<Putter> producers = new ArrayList<Putter>();
     ArrayList<Taker> consumers = new ArrayList<Taker>();
@@ -188,7 +277,7 @@ public class TestCallQueueManager {
     Thread.sleep(500);
 
     for (int i=0; i < 5; i++) {
-      manager.swapQueue(queueClass, 5000, "", null);
+      manager.swapQueue(schedulerClass, queueClass, 5000, "", conf);
     }
 
     // Stop the producers
@@ -223,24 +312,50 @@ public class TestCallQueueManager {
   }
 
   public static class ExceptionFakeCall {
-
     public ExceptionFakeCall() {
-      throw new IllegalArgumentException("Exception caused by constructor.!!");
+      throw new IllegalArgumentException("Exception caused by call queue " +
+          "constructor.!!");
+    }
+  }
+
+  public static class ExceptionFakeScheduler {
+    public ExceptionFakeScheduler() {
+      throw new IllegalArgumentException("Exception caused by " +
+          "scheduler constructor.!!");
     }
   }
 
-  private static final Class<? extends BlockingQueue<ExceptionFakeCall>> exceptionQueueClass = CallQueueManager
-      .convertQueueClass(ExceptionFakeCall.class, ExceptionFakeCall.class);
+  private static final Class<? extends RpcScheduler>
+      exceptionSchedulerClass = CallQueueManager.convertSchedulerClass(
+      ExceptionFakeScheduler.class);
+
+  private static final Class<? extends BlockingQueue<ExceptionFakeCall>>
+      exceptionQueueClass = CallQueueManager.convertQueueClass(
+      ExceptionFakeCall.class, ExceptionFakeCall.class);
+
+  @Test
+  public void testCallQueueConstructorException() throws InterruptedException {
+    try {
+      new CallQueueManager<ExceptionFakeCall>(exceptionQueueClass,
+          schedulerClass, false, 10, "", new Configuration());
+      fail();
+    } catch (RuntimeException re) {
+      assertTrue(re.getCause() instanceof IllegalArgumentException);
+      assertEquals("Exception caused by call queue constructor.!!", re
+          .getCause()
+          .getMessage());
+    }
+  }
 
   @Test
-  public void testInvocationException() throws InterruptedException {
+  public void testSchedulerConstructorException() throws InterruptedException {
     try {
-      new CallQueueManager<ExceptionFakeCall>(exceptionQueueClass, false, 10,
-          "", null);
+      new CallQueueManager<FakeCall>(queueClass, exceptionSchedulerClass,
+          false, 10, "", new Configuration());
       fail();
     } catch (RuntimeException re) {
       assertTrue(re.getCause() instanceof IllegalArgumentException);
-      assertEquals("Exception caused by constructor.!!", re.getCause()
+      assertEquals("Exception caused by scheduler constructor.!!", re.getCause()
           .getMessage());
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
index edc3b00..0b0408c 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java
@@ -18,12 +18,11 @@
 
 package org.apache.hadoop.ipc;
 
+import static java.lang.Thread.sleep;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.Arrays;
 import org.junit.Test;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -31,8 +30,6 @@ import static org.mockito.Mockito.when;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.conf.Configuration;
 
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-
 public class TestDecayRpcScheduler {
   private Schedulable mockCall(String id) {
     Schedulable mockCall = mock(Schedulable.class);
@@ -57,30 +54,32 @@ public class TestDecayRpcScheduler {
   }
 
   @Test
+  @SuppressWarnings("deprecation")
   public void testParsePeriod() {
     // By default
     scheduler = new DecayRpcScheduler(1, "", new Configuration());
-    assertEquals(DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT,
+    assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT,
       scheduler.getDecayPeriodMillis());
 
     // Custom
     Configuration conf = new Configuration();
-    conf.setLong("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY,
+    conf.setLong("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
       1058);
     scheduler = new DecayRpcScheduler(1, "ns", conf);
     assertEquals(1058L, scheduler.getDecayPeriodMillis());
   }
 
   @Test
+  @SuppressWarnings("deprecation")
   public void testParseFactor() {
     // Default
     scheduler = new DecayRpcScheduler(1, "", new Configuration());
-    assertEquals(DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT,
+    assertEquals(DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT,
       scheduler.getDecayFactor(), 0.00001);
 
     // Custom
     Configuration conf = new Configuration();
-    conf.set("prefix." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY,
+    conf.set("prefix." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY,
       "0.125");
     scheduler = new DecayRpcScheduler(1, "prefix", conf);
     assertEquals(0.125, scheduler.getDecayFactor(), 0.00001);
@@ -94,6 +93,7 @@ public class TestDecayRpcScheduler {
   }
 
   @Test
+  @SuppressWarnings("deprecation")
   public void testParseThresholds() {
     // Defaults vary by number of queues
     Configuration conf = new Configuration();
@@ -111,16 +111,17 @@ public class TestDecayRpcScheduler {
 
     // Custom
     conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY,
+    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
       "1, 10, 20, 50, 85");
     scheduler = new DecayRpcScheduler(6, "ns", conf);
     assertEqualDecimalArrays(new double[]{0.01, 0.1, 0.2, 0.5, 0.85}, scheduler.getThresholds());
   }
 
   @Test
+  @SuppressWarnings("deprecation")
   public void testAccumulate() {
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
+    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
     scheduler = new DecayRpcScheduler(1, "ns", conf);
 
     assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first
@@ -138,10 +139,11 @@ public class TestDecayRpcScheduler {
   }
 
   @Test
-  public void testDecay() {
+  @SuppressWarnings("deprecation")
+  public void testDecay() throws Exception {
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
-    conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, "0.5");
+    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
+    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
     scheduler = new DecayRpcScheduler(1, "ns", conf);
 
     assertEquals(0, scheduler.getTotalCallSnapshot());
@@ -150,6 +152,8 @@ public class TestDecayRpcScheduler {
       scheduler.getPriorityLevel(mockCall("A"));
     }
 
+    sleep(1000);
+
     for (int i = 0; i < 8; i++) {
       scheduler.getPriorityLevel(mockCall("B"));
     }
@@ -184,10 +188,11 @@ public class TestDecayRpcScheduler {
   }
 
   @Test
+  @SuppressWarnings("deprecation")
   public void testPriority() {
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
-    conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY,
+    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
+    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY,
       "25, 50, 75");
     scheduler = new DecayRpcScheduler(4, "ns", conf);
 
@@ -204,10 +209,11 @@ public class TestDecayRpcScheduler {
   }
 
   @Test(timeout=2000)
+  @SuppressWarnings("deprecation")
   public void testPeriodic() throws InterruptedException {
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY, "10");
-    conf.set("ns." + DecayRpcScheduler.IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY, "0.5");
+    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "10");
+    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
     scheduler = new DecayRpcScheduler(1, "ns", conf);
 
     assertEquals(10, scheduler.getDecayPeriodMillis());
@@ -219,7 +225,7 @@ public class TestDecayRpcScheduler {
 
     // It should eventually decay to zero
     while (scheduler.getTotalCallSnapshot() > 0) {
-      Thread.sleep(10);
+      sleep(10);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
index 2694ba3..4a8ad3b 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java
@@ -37,21 +37,24 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.conf.Configuration;
 import org.mockito.Matchers;
 
-import static org.apache.hadoop.ipc.FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY;
-
 public class TestFairCallQueue extends TestCase {
   private FairCallQueue<Schedulable> fcq;
 
-  private Schedulable mockCall(String id) {
+  private Schedulable mockCall(String id, int priority) {
     Schedulable mockCall = mock(Schedulable.class);
     UserGroupInformation ugi = mock(UserGroupInformation.class);
 
     when(ugi.getUserName()).thenReturn(id);
     when(mockCall.getUserGroupInformation()).thenReturn(ugi);
+    when(mockCall.getPriorityLevel()).thenReturn(priority);
 
     return mockCall;
   }
 
+  private Schedulable mockCall(String id) {
+    return mockCall(id, 0);
+  }
+
   // A scheduler which always schedules into priority zero
   private RpcScheduler alwaysZeroScheduler;
   {
@@ -60,11 +63,12 @@ public class TestFairCallQueue extends TestCase {
     alwaysZeroScheduler = sched;
   }
 
+  @SuppressWarnings("deprecation")
   public void setUp() {
     Configuration conf = new Configuration();
-    conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
+    conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
 
-    fcq = new FairCallQueue<Schedulable>(5, "ns", conf);
+    fcq = new FairCallQueue<Schedulable>(2, 5, "ns", conf);
   }
 
   //
@@ -85,7 +89,6 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testOfferSucceeds() {
-    fcq.setScheduler(alwaysZeroScheduler);
 
     for (int i = 0; i < 5; i++) {
       // We can fit 10 calls
@@ -96,7 +99,6 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testOfferFailsWhenFull() {
-    fcq.setScheduler(alwaysZeroScheduler);
     for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); }
 
     assertFalse(fcq.offer(mockCall("c"))); // It's full
@@ -107,11 +109,10 @@ public class TestFairCallQueue extends TestCase {
   public void testOfferSucceedsWhenScheduledLowPriority() {
     // Scheduler will schedule into queue 0 x 5, then queue 1
     RpcScheduler sched = mock(RpcScheduler.class);
-    when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 0, 0, 0, 0, 1, 0);
-    fcq.setScheduler(sched);
-    for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c"))); }
+    int mockedPriorities[] = {0, 0, 0, 0, 0, 1, 0};
+    for (int i = 0; i < 5; i++) { assertTrue(fcq.offer(mockCall("c", mockedPriorities[i]))); }
 
-    assertTrue(fcq.offer(mockCall("c")));
+    assertTrue(fcq.offer(mockCall("c", mockedPriorities[5])));
 
     assertEquals(6, fcq.size());
   }
@@ -121,7 +122,7 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testPeekNonDestructive() {
-    Schedulable call = mockCall("c");
+    Schedulable call = mockCall("c", 0);
     assertTrue(fcq.offer(call));
 
     assertEquals(call, fcq.peek());
@@ -130,8 +131,8 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testPeekPointsAtHead() {
-    Schedulable call = mockCall("c");
-    Schedulable next = mockCall("b");
+    Schedulable call = mockCall("c", 0);
+    Schedulable next = mockCall("b", 0);
     fcq.offer(call);
     fcq.offer(next);
 
@@ -139,15 +140,11 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testPollTimeout() throws InterruptedException {
-    fcq.setScheduler(alwaysZeroScheduler);
-
     assertNull(fcq.poll(10, TimeUnit.MILLISECONDS));
   }
 
   public void testPollSuccess() throws InterruptedException {
-    fcq.setScheduler(alwaysZeroScheduler);
-
-    Schedulable call = mockCall("c");
+    Schedulable call = mockCall("c", 0);
     assertTrue(fcq.offer(call));
 
     assertEquals(call, fcq.poll(10, TimeUnit.MILLISECONDS));
@@ -156,7 +153,6 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testOfferTimeout() throws InterruptedException {
-    fcq.setScheduler(alwaysZeroScheduler);
     for (int i = 0; i < 5; i++) {
       assertTrue(fcq.offer(mockCall("c"), 10, TimeUnit.MILLISECONDS));
     }
@@ -166,13 +162,11 @@ public class TestFairCallQueue extends TestCase {
     assertEquals(5, fcq.size());
   }
 
+  @SuppressWarnings("deprecation")
   public void testDrainTo() {
     Configuration conf = new Configuration();
-    conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
-    FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(10, "ns", conf);
-
-    fcq.setScheduler(alwaysZeroScheduler);
-    fcq2.setScheduler(alwaysZeroScheduler);
+    conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
+    FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(2, 10, "ns", conf);
 
     // Start with 3 in fcq, to be drained
     for (int i = 0; i < 3; i++) {
@@ -185,13 +179,11 @@ public class TestFairCallQueue extends TestCase {
     assertEquals(3, fcq2.size());
   }
 
+  @SuppressWarnings("deprecation")
   public void testDrainToWithLimit() {
     Configuration conf = new Configuration();
-    conf.setInt("ns." + IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
-    FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(10, "ns", conf);
-
-    fcq.setScheduler(alwaysZeroScheduler);
-    fcq2.setScheduler(alwaysZeroScheduler);
+    conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
+    FairCallQueue<Schedulable> fcq2 = new FairCallQueue<Schedulable>(2, 10, "ns", conf);
 
     // Start with 3 in fcq, to be drained
     for (int i = 0; i < 3; i++) {
@@ -209,27 +201,23 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testFirstQueueFullRemainingCapacity() {
-    fcq.setScheduler(alwaysZeroScheduler);
     while (fcq.offer(mockCall("c"))) ; // Queue 0 will fill up first, then queue 1
 
     assertEquals(5, fcq.remainingCapacity());
   }
 
   public void testAllQueuesFullRemainingCapacity() {
-    RpcScheduler sched = mock(RpcScheduler.class);
-    when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 0, 0, 0, 0, 1, 1, 1, 1, 1);
-    fcq.setScheduler(sched);
-    while (fcq.offer(mockCall("c"))) ;
+    int[] mockedPriorities = {0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 0};
+    int i = 0;
+    while (fcq.offer(mockCall("c", mockedPriorities[i++]))) ;
 
     assertEquals(0, fcq.remainingCapacity());
     assertEquals(10, fcq.size());
   }
 
   public void testQueuesPartialFilledRemainingCapacity() {
-    RpcScheduler sched = mock(RpcScheduler.class);
-    when(sched.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(0, 1, 0, 1, 0);
-    fcq.setScheduler(sched);
-    for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c")); }
+    int[] mockedPriorities = {0, 1, 0, 1, 0};
+    for (int i = 0; i < 5; i++) { fcq.offer(mockCall("c", mockedPriorities[i])); }
 
     assertEquals(5, fcq.remainingCapacity());
     assertEquals(5, fcq.size());
@@ -351,16 +339,12 @@ public class TestFairCallQueue extends TestCase {
 
   // Make sure put will overflow into lower queues when the top is full
   public void testPutOverflows() throws InterruptedException {
-    fcq.setScheduler(alwaysZeroScheduler);
-
     // We can fit more than 5, even though the scheduler suggests the top queue
     assertCanPut(fcq, 8, 8);
     assertEquals(8, fcq.size());
   }
 
   public void testPutBlocksWhenAllFull() throws InterruptedException {
-    fcq.setScheduler(alwaysZeroScheduler);
-
     assertCanPut(fcq, 10, 10); // Fill up
     assertEquals(10, fcq.size());
 
@@ -369,12 +353,10 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testTakeBlocksWhenEmpty() throws InterruptedException {
-    fcq.setScheduler(alwaysZeroScheduler);
     assertCanTake(fcq, 0, 1);
   }
 
   public void testTakeRemovesCall() throws InterruptedException {
-    fcq.setScheduler(alwaysZeroScheduler);
     Schedulable call = mockCall("c");
     fcq.offer(call);
 
@@ -383,17 +365,14 @@ public class TestFairCallQueue extends TestCase {
   }
 
   public void testTakeTriesNextQueue() throws InterruptedException {
-    // Make a FCQ filled with calls in q 1 but empty in q 0
-    RpcScheduler q1Scheduler = mock(RpcScheduler.class);
-    when(q1Scheduler.getPriorityLevel(Matchers.<Schedulable>any())).thenReturn(1);
-    fcq.setScheduler(q1Scheduler);
 
     // A mux which only draws from q 0
     RpcMultiplexer q0mux = mock(RpcMultiplexer.class);
     when(q0mux.getAndAdvanceCurrentIndex()).thenReturn(0);
     fcq.setMultiplexer(q0mux);
 
-    Schedulable call = mockCall("c");
+    // Make a FCQ filled with calls in q 1 but empty in q 0
+    Schedulable call = mockCall("c", 1);
     fcq.put(call);
 
     // Take from q1 even though mux said q0, since q0 empty

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d95c6eb3/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java
index 1fa0fff..2638412 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIdentityProviders.java
@@ -19,25 +19,15 @@
 package org.apache.hadoop.ipc;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
-import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Test;
-import org.junit.Before;
-import org.junit.After;
 
 import java.util.List;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
-import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.io.Writable;
 
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.conf.Configuration;
@@ -55,16 +45,20 @@ public class TestIdentityProviders {
       }
     }
 
+    @Override
+    public int getPriorityLevel() {
+      return 0;
+    }
   }
 
   @Test
   public void testPluggableIdentityProvider() {
     Configuration conf = new Configuration();
-    conf.set(CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
+    conf.set(CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
       "org.apache.hadoop.ipc.UserIdentityProvider");
 
     List<IdentityProvider> providers = conf.getInstances(
-      CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
+      CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
       IdentityProvider.class);
 
     assertTrue(providers.size() == 1);


Mime
View raw message