hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-16023 Fastpath for the FIFO rpcscheduler Adds an executor that does balanced queue and fast path handing off requests directly to waiting handlers if any present. Idea taken from Apace Kudu (incubating). See https://gerrit.clouder
Date Wed, 15 Jun 2016 00:42:08 GMT
Repository: hbase
Updated Branches:
  refs/heads/master fa50d456a -> ee86e91e7


HBASE-16023 Fastpath for the FIFO rpcscheduler Adds an executor that does balanced queue and
fast path handing off requests directly to waiting handlers if any present. Idea taken from
Apace Kudu (incubating). See https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h

M hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
 Refactor which makes a Handler type. Put all 'handler' stuff inside this
 new type. Also make it so subclass can provide its own Handler type.

M hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 Name the handler threads for their type so can tell if configs are
 having an effect.

Signed-off-by: stack <stack@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ee86e91e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ee86e91e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ee86e91e

Branch: refs/heads/master
Commit: ee86e91e7eb3d647c89db496baa12a332f2c0c18
Parents: fa50d45
Author: stack <stack@apache.org>
Authored: Tue Jun 14 11:18:34 2016 -0700
Committer: stack <stack@apache.org>
Committed: Tue Jun 14 17:42:01 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/BalancedQueueRpcExecutor.java     |   5 +-
 ...ifoWithFastPathBalancedQueueRpcExecutor.java | 116 ++++++++++++++
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    | 160 ++++++++++++-------
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  39 +++--
 .../hbase/ipc/TestSimpleRpcScheduler.java       |   3 +-
 5 files changed, 241 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ee86e91e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
index 3505221..241d36e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/BalancedQueueRpcExecutor.java
@@ -33,7 +33,8 @@ import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
  * An {@link RpcExecutor} that will balance requests evenly across all its queues, but still
remains
- * efficient with a single queue via an inlinable queue balancing mechanism.
+ * efficient with a single queue via an inlinable queue balancing mechanism. Defaults to
FIFO but
+ * you can pass an alternate queue class to use.
  */
 @InterfaceAudience.LimitedPrivate({ HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX
})
 @InterfaceStability.Evolving
@@ -103,4 +104,4 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
   public List<BlockingQueue<CallRunner>> getQueues() {
     return queues;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ee86e91e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
new file mode 100644
index 0000000..1951dd0
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect
for
+ * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible.
+ * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling
+ * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating).
See
+ * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h
+ */
+@InterfaceAudience.Private
+public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
+  // Depends on default behavior of BalancedQueueRpcExecutor being FIFO!
+
+  /*
+   * Stack of Handlers waiting for work.
+   */
+  private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
+
+  public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
+      final int numQueues, final int maxQueueLength, final Configuration conf,
+      final Abortable abortable) {
+    super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
+        maxQueueLength);
+  }
+
+  @Override
+  protected Handler getHandler(String name, double handlerFailureThreshhold,
+      BlockingQueue<CallRunner> q) {
+    return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
+  }
+
+  @Override
+  public boolean dispatch(CallRunner callTask) throws InterruptedException {
+    FastPathHandler handler = popReadyHandler();
+    return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
+  }
+
+  /**
+   * @return Pop a Handler instance if one available ready-to-go or else return null.
+   */
+  private FastPathHandler popReadyHandler() {
+    return this.fastPathHandlerStack.poll();
+  }
+
+  class FastPathHandler extends Handler {
+    // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack
Deque
+    // if an empty queue of CallRunners so we are available for direct handoff when one comes
in.
+    final Deque<FastPathHandler> fastPathHandlerStack;
+    // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
+    private Semaphore semaphore = new Semaphore(1);
+    // The task we get when fast-pathing.
+    private CallRunner loadedCallRunner;
+
+    FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner>
q,
+        final Deque<FastPathHandler> fastPathHandlerStack) {
+      super(name, handlerFailureThreshhold, q);
+      this.fastPathHandlerStack = fastPathHandlerStack;
+      this.semaphore.drainPermits();
+    }
+
+    protected CallRunner getCallRunner() throws InterruptedException {
+      // Get a callrunner if one in the Q.
+      CallRunner cr = this.q.poll();
+      if (cr == null) {
+        // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves
for
+        // the fastpath handoff done via fastPathHandlerStack.
+        if (this.fastPathHandlerStack != null) {
+          this.fastPathHandlerStack.push(this);
+          this.semaphore.acquire();
+          cr = this.loadedCallRunner;
+        } else {
+          // No fastpath available. Block until a task comes available.
+          cr = super.getCallRunner();
+        }
+      }
+      return cr;
+    }
+
+    /**
+     * @param task Task gotten via fastpath.
+     * @return True if we successfully loaded our task
+     */
+    boolean loadCallRunner(final CallRunner cr) {
+      this.loadedCallRunner = cr;
+      this.semaphore.release();
+      return true;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ee86e91e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 880df36..5b6c6c8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Locale;
@@ -31,15 +32,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
+/**
+ * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
+ * scheduling behavior.
+ */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
 public abstract class RpcExecutor {
   private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
 
@@ -48,7 +51,7 @@ public abstract class RpcExecutor {
   protected volatile int currentQueueLimit;
 
   private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
-  private final List<Thread> handlers;
+  private final List<Handler> handlers;
   private final int handlerCount;
   private final String name;
   private final AtomicInteger failedHandlerCount = new AtomicInteger(0);
@@ -59,7 +62,7 @@ public abstract class RpcExecutor {
   private Abortable abortable = null;
 
   public RpcExecutor(final String name, final int handlerCount) {
-    this.handlers = new ArrayList<Thread>(handlerCount);
+    this.handlers = new ArrayList<Handler>(handlerCount);
     this.handlerCount = handlerCount;
     this.name = Strings.nullToEmpty(name);
   }
@@ -101,75 +104,111 @@ public abstract class RpcExecutor {
     startHandlers(null, handlerCount, callQueues, 0, callQueues.size(), port);
   }
 
+  /**
+   * Override if providing alternate Handler implementation.
+   */
+  protected Handler getHandler(final String name, final double handlerFailureThreshhold,
+      final BlockingQueue<CallRunner> q) {
+    return new Handler(name, handlerFailureThreshhold, q);
+  }
+
+  /**
+   * Start up our handlers.
+   */
   protected void startHandlers(final String nameSuffix, final int numHandlers,
       final List<BlockingQueue<CallRunner>> callQueues,
       final int qindex, final int qsize, final int port) {
     final String threadPrefix = name + Strings.nullToEmpty(nameSuffix);
+    double handlerFailureThreshhold =
+        conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
+          HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
     for (int i = 0; i < numHandlers; i++) {
       final int index = qindex + (i % qsize);
-      Thread t = new Thread(new Runnable() {
-        @Override
-        public void run() {
-          consumerLoop(callQueues.get(index));
-        }
-      });
-      t.setDaemon(true);
-      t.setName(threadPrefix + "RpcServer.handler=" + handlers.size() +
-        ",queue=" + index + ",port=" + port);
-      t.start();
-      LOG.debug(threadPrefix + " Start Handler index=" + handlers.size() + " queue=" + index);
-      handlers.add(t);
+      String name = "RpcServer." + threadPrefix + ".handler=" + handlers.size() + ",queue="
+
+          index + ",port=" + port;
+      Handler handler = getHandler(name, handlerFailureThreshhold, callQueues.get(index));
+      handler.start();
+      LOG.debug("Started " + name);
+      handlers.add(handler);
     }
   }
 
-  protected void consumerLoop(final BlockingQueue<CallRunner> myQueue) {
-    boolean interrupted = false;
-    double handlerFailureThreshhold =
-        conf == null ? 1.0 : conf.getDouble(HConstants.REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT,
-          HConstants.DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT);
-    try {
-      while (running) {
-        try {
-          MonitoredRPCHandler status = RpcServer.getStatus();
-          CallRunner task = myQueue.take();
-          task.setStatus(status);
+  /**
+   * Handler thread run the {@link CallRunner#run()} in.
+   */
+  protected class Handler extends Thread {
+    /**
+     * Q to find CallRunners to run in.
+     */
+    final BlockingQueue<CallRunner> q;
+
+    final double handlerFailureThreshhold;
+
+    Handler(final String name, final double handlerFailureThreshhold,
+        final BlockingQueue<CallRunner> q) {
+      super(name);
+      setDaemon(true);
+      this.q = q;
+      this.handlerFailureThreshhold = handlerFailureThreshhold;
+    }
+
+    /**
+     * @return A {@link CallRunner}
+     * @throws InterruptedException
+     */
+    protected CallRunner getCallRunner() throws InterruptedException {
+      return this.q.take();
+    }
+
+    @Override
+    public void run() {
+      boolean interrupted = false;
+      try {
+        while (running) {
           try {
-            activeHandlerCount.incrementAndGet();
-            task.run();
-          } catch (Throwable e) {
-            if (e instanceof Error) {
-              int failedCount = failedHandlerCount.incrementAndGet();
-              if (handlerFailureThreshhold >= 0
-                  && failedCount > handlerCount * handlerFailureThreshhold) {
-                String message =
-                    "Number of failed RpcServer handler exceeded threshhold "
-                        + handlerFailureThreshhold + "  with failed reason: "
-                        + StringUtils.stringifyException(e);
-                if (abortable != null) {
-                  abortable.abort(message, e);
-                } else {
-                  LOG.error("Received " + StringUtils.stringifyException(e)
-                    + " but not aborting due to abortable being null");
-                  throw e;
-                }
-              } else {
-                LOG.warn("RpcServer handler threads encountered errors "
-                    + StringUtils.stringifyException(e));
-              }
+            run(getCallRunner());
+          } catch (InterruptedException e) {
+            interrupted = true;
+          }
+        }
+      } catch (Exception e) {
+        LOG.warn(e);
+        throw e;
+      } finally {
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+
+    private void run(CallRunner cr) {
+      MonitoredRPCHandler status = RpcServer.getStatus();
+      cr.setStatus(status);
+      try {
+        activeHandlerCount.incrementAndGet();
+        cr.run();
+      } catch (Throwable e) {
+        if (e instanceof Error) {
+          int failedCount = failedHandlerCount.incrementAndGet();
+          if (this.handlerFailureThreshhold >= 0 &&
+              failedCount > handlerCount * this.handlerFailureThreshhold) {
+            String message = "Number of failed RpcServer handler runs exceeded threshhold
" +
+              this.handlerFailureThreshhold + "; reason: " + StringUtils.stringifyException(e);
+            if (abortable != null) {
+              abortable.abort(message, e);
             } else {
-              LOG.warn("RpcServer handler threads encountered exceptions "
-                  + StringUtils.stringifyException(e));
+              LOG.error("Error but can't abort because abortable is null: " +
+                  StringUtils.stringifyException(e));
+              throw e;
             }
-          } finally {
-            activeHandlerCount.decrementAndGet();
+          } else {
+            LOG.warn("Handler errors " + StringUtils.stringifyException(e));
           }
-        } catch (InterruptedException e) {
-          interrupted = true;
+        } else {
+          LOG.warn("Handler  exception " + StringUtils.stringifyException(e));
         }
-      }
-    } finally {
-      if (interrupted) {
-        Thread.currentThread().interrupt();
+      } finally {
+        activeHandlerCount.decrementAndGet();
       }
     }
   }
@@ -194,7 +233,6 @@ public abstract class RpcExecutor {
    * All requests go to the first queue, at index 0
    */
   private static QueueBalancer ONE_QUEUE = new QueueBalancer() {
-
     @Override
     public int getNextQueue() {
       return 0;
@@ -227,4 +265,4 @@ public abstract class RpcExecutor {
     }
     currentQueueLimit = conf.getInt(configKey, currentQueueLimit);
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/ee86e91e/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 85bf78d..6e623a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -99,7 +99,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
     String callQueueType = conf.get(CALL_QUEUE_TYPE_CONF_KEY,
       CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
 
-    if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+    if (isCodelQueueType(callQueueType)) {
       // update CoDel Scheduler tunables
       int codelTargetDelay = conf.getInt(CALL_QUEUE_CODEL_TARGET_DELAY,
         CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY);
@@ -204,18 +204,19 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
       // multiple read/write queues
       if (isDeadlineQueueType(callQueueType)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
-        callExecutor = new RWQueueRpcExecutor("RW.deadline.Q", handlerCount, numCallQueues,
+        callExecutor = new RWQueueRpcExecutor("DeadlineRWQ.default", handlerCount, numCallQueues,
             callqReadShare, callqScanShare, maxQueueLength, conf, abortable,
             BoundedPriorityBlockingQueue.class, callPriority);
-      } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+      } else if (isCodelQueueType(callQueueType)) {
         Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
           codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
-        callExecutor = new RWQueueRpcExecutor("RW.codel.Q", handlerCount,
+        callExecutor = new RWQueueRpcExecutor("CodelRWQ.default", handlerCount,
           numCallQueues, callqReadShare, callqScanShare,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
       } else {
-        callExecutor = new RWQueueRpcExecutor("RW.fifo.Q", handlerCount, numCallQueues,
+        // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
+        callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues,
           callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
       }
     } else {
@@ -223,33 +224,37 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
       if (isDeadlineQueueType(callQueueType)) {
         CallPriorityComparator callPriority = new CallPriorityComparator(conf, this.priority);
         callExecutor =
-          new BalancedQueueRpcExecutor("B.deadline.Q", handlerCount, numCallQueues,
+          new BalancedQueueRpcExecutor("DeadlineBQ.default", handlerCount, numCallQueues,
             conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
-      } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
+      } else if (isCodelQueueType(callQueueType)) {
         callExecutor =
-          new BalancedQueueRpcExecutor("B.codel.Q", handlerCount, numCallQueues,
+          new BalancedQueueRpcExecutor("CodelBQ.default", handlerCount, numCallQueues,
             conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
             codelTargetDelay, codelInterval, codelLifoThreshold,
             numGeneralCallsDropped, numLifoModeSwitches);
       } else {
-        callExecutor = new BalancedQueueRpcExecutor("B.fifo.Q", handlerCount,
-            numCallQueues, maxQueueLength, conf, abortable);
+        // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
+        callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
+            handlerCount, numCallQueues, maxQueueLength, conf, abortable);
       }
     }
     // Create 2 queues to help priorityExecutor be more scalable.
-    this.priorityExecutor = priorityHandlerCount > 0 ?
-      new BalancedQueueRpcExecutor("B.priority.fifo.Q", priorityHandlerCount, 2,
-          maxPriorityQueueLength):
-      null;
-   this.replicationExecutor =
-     replicationHandlerCount > 0 ? new BalancedQueueRpcExecutor("B.replication.fifo.Q",
-       replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
+    this.priorityExecutor = priorityHandlerCount > 0?
+      new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
+         2, maxPriorityQueueLength, conf, abortable): null;
+    this.replicationExecutor = replicationHandlerCount > 0?
+      new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
+        replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
   }
 
   private static boolean isDeadlineQueueType(final String callQueueType) {
     return callQueueType.equals(CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE);
   }
 
+  private static boolean isCodelQueueType(final String callQueueType) {
+    return callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
+  }
+
   public SimpleRpcScheduler(
 	      Configuration conf,
 	      int handlerCount,

http://git-wip-us.apache.org/repos/asf/hbase/blob/ee86e91e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 8ae3078..53f9175 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
 import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
 import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.RPCTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdge;
@@ -242,7 +241,7 @@ public class TestSimpleRpcScheduler {
       // -> WITH REORDER [10 10 10 10 10 10 50 100] -> 530 (Deadline Queue)
       if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE)) {
         assertEquals(530, totalTime);
-      } else /* if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE))
*/ {
+      } else if (queueType.equals(SimpleRpcScheduler.CALL_QUEUE_TYPE_FIFO_CONF_VALUE)) {
         assertEquals(930, totalTime);
       }
     } finally {


Mime
View raw message