hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ecl...@apache.org
Subject hbase git commit: HBASE-16089 Add on FastPath for CoDel
Date Sat, 25 Jun 2016 20:24:40 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 1d06850f4 -> fc4b8aa89


HBASE-16089 Add on FastPath for CoDel


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fc4b8aa8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fc4b8aa8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fc4b8aa8

Branch: refs/heads/master
Commit: fc4b8aa89d22961feb1ab7849104e68f6e1c0153
Parents: 1d06850
Author: Elliott Clark <eclark@apache.org>
Authored: Wed Jun 22 16:34:40 2016 -0700
Committer: Elliott Clark <eclark@apache.org>
Committed: Sat Jun 25 12:57:15 2016 -0700

----------------------------------------------------------------------
 .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java   |  59 ++++++---
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |   4 +-
 .../ipc/FastPathBalancedQueueRpcExecutor.java   | 126 +++++++++++++++++++
 ...ifoWithFastPathBalancedQueueRpcExecutor.java | 116 -----------------
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  14 +--
 .../hbase/ipc/TestSimpleRpcScheduler.java       |  74 +++++++----
 6 files changed, 227 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fc4b8aa8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
index 08c488b..42b500f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java
@@ -73,7 +73,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner>
{
   private AtomicBoolean resetDelay = new AtomicBoolean(true);
 
   // if we're in this mode, "long" calls are getting dropped
-  private volatile boolean isOverloaded;
+  private AtomicBoolean isOverloaded = new AtomicBoolean(false);
 
   public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval,
       double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches)
{
@@ -126,6 +126,34 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner>
{
     }
   }
 
+  @Override
+  public CallRunner poll() {
+    CallRunner cr;
+    boolean switched = false;
+    while(true) {
+      if (((double) queue.size() / this.maxCapacity) > lifoThreshold) {
+        // Only count once per switch.
+        if (!switched) {
+          switched = true;
+          numLifoModeSwitches.incrementAndGet();
+        }
+        cr = queue.pollLast();
+      } else {
+        switched = false;
+        cr = queue.pollFirst();
+      }
+      if (cr == null) {
+        return cr;
+      }
+      if (needToDrop(cr)) {
+        numGeneralCallsDropped.incrementAndGet();
+        cr.drop();
+      } else {
+        return cr;
+      }
+    }
+  }
+
   /**
    * @param callRunner to validate
    * @return true if this call needs to be skipped based on call timestamp
@@ -136,28 +164,28 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner>
{
     long callDelay = now - callRunner.getCall().timestamp;
 
     long localMinDelay = this.minDelay;
-    if (now > intervalTime && !resetDelay.getAndSet(true)) {
+
+    // Try and determine if we should reset
+    // the delay time and determine overload
+    if (now > intervalTime &&
+        !resetDelay.get() &&
+        !resetDelay.getAndSet(true)) {
       intervalTime = now + codelInterval;
 
-      if (localMinDelay > codelTargetDelay) {
-        isOverloaded = true;
-      } else {
-        isOverloaded = false;
-      }
+      isOverloaded.set(localMinDelay > codelTargetDelay);
     }
 
-    if (resetDelay.getAndSet(false)) {
+    // If it looks like we should reset the delay
+    // time do it only once on one thread
+    if (resetDelay.get() && resetDelay.getAndSet(false)) {
       minDelay = callDelay;
+      // we just reset the delay dunno about how this will work
       return false;
     } else if (callDelay < localMinDelay) {
       minDelay = callDelay;
     }
 
-    if (isOverloaded && callDelay > 2 * codelTargetDelay) {
-      return true;
-    } else {
-      return false;
-    }
+    return isOverloaded.get() && callDelay > 2 * codelTargetDelay;
   }
 
   // Generic BlockingQueue methods we support
@@ -185,11 +213,6 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner>
{
       + " but take() and offer() methods");
   }
 
-  @Override
-  public CallRunner poll() {
-    throw new UnsupportedOperationException("This class doesn't support anything,"
-      + " but take() and offer() methods");
-  }
 
   @Override
   public CallRunner peek() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc4b8aa8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 00e08c9..e91699a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -64,7 +64,9 @@ public class CallRunner {
     this.call = call;
     this.rpcServer = rpcServer;
     // Add size of the call to queue size.
-    this.rpcServer.addCallSize(call.getSize());
+    if (call != null && rpcServer != null) {
+      this.rpcServer.addCallSize(call.getSize());
+    }
   }
 
   public Call getCall() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc4b8aa8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
new file mode 100644
index 0000000..4e06f4f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Deque;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Balanced queue executor with a fastpath. Because this is FIFO, it has no respect for
+ * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible.
+ * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling
+ * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating).
See
+ * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h
+ */
+@InterfaceAudience.Private
+public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
+  // Depends on default behavior of BalancedQueueRpcExecutor being FIFO!
+
+  /*
+   * Stack of Handlers waiting for work.
+   */
+  private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
+
+  public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
+                                          final int numQueues, final int maxQueueLength,
final Configuration conf,
+                                          final Abortable abortable) {
+    super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
+        maxQueueLength);
+  }
+
+  public FastPathBalancedQueueRpcExecutor(String name, int handlerCount,
+                                          int numCallQueues,
+                                          Configuration conf,
+                                          Abortable abortable,
+                                          Class<? extends BlockingQueue> queueClass,
+                                          Object... args) {
+    super(name, handlerCount, numCallQueues, conf, abortable, queueClass, args);
+  }
+
+  @Override
+  protected Handler getHandler(String name, double handlerFailureThreshhold,
+      BlockingQueue<CallRunner> q) {
+    return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
+  }
+
+  @Override
+  public boolean dispatch(CallRunner callTask) throws InterruptedException {
+    FastPathHandler handler = popReadyHandler();
+    return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
+  }
+
+  /**
+   * @return Pop a Handler instance if one available ready-to-go or else return null.
+   */
+  private FastPathHandler popReadyHandler() {
+    return this.fastPathHandlerStack.poll();
+  }
+
+  class FastPathHandler extends Handler {
+    // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack
Deque
+    // if an empty queue of CallRunners so we are available for direct handoff when one comes
in.
+    final Deque<FastPathHandler> fastPathHandlerStack;
+    // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
+    private Semaphore semaphore = new Semaphore(0);
+    // The task we get when fast-pathing.
+    private CallRunner loadedCallRunner;
+
+    FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner>
q,
+        final Deque<FastPathHandler> fastPathHandlerStack) {
+      super(name, handlerFailureThreshhold, q);
+      this.fastPathHandlerStack = fastPathHandlerStack;
+    }
+
+    protected CallRunner getCallRunner() throws InterruptedException {
+      // Get a callrunner if one in the Q.
+      CallRunner cr = this.q.poll();
+      if (cr == null) {
+        // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves
for
+        // the fastpath handoff done via fastPathHandlerStack.
+        if (this.fastPathHandlerStack != null) {
+          this.fastPathHandlerStack.push(this);
+          this.semaphore.acquire();
+          cr = this.loadedCallRunner;
+          this.loadedCallRunner = null;
+        } else {
+          // No fastpath available. Block until a task comes available.
+          cr = super.getCallRunner();
+        }
+      }
+      return cr;
+    }
+
+    /**
+     * @param task Task gotten via fastpath.
+     * @return True if we successfully loaded our task
+     */
+    boolean loadCallRunner(final CallRunner cr) {
+      this.loadedCallRunner = cr;
+      this.semaphore.release();
+      return true;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc4b8aa8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
deleted file mode 100644
index 1a362bc..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.ipc;
-
-import java.util.Deque;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Semaphore;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect
for
- * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible.
- * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling
- * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating).
See
- * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h
- */
-@InterfaceAudience.Private
-public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
-  // Depends on default behavior of BalancedQueueRpcExecutor being FIFO!
-
-  /*
-   * Stack of Handlers waiting for work.
-   */
-  private final Deque<FastPathHandler> fastPathHandlerStack = new ConcurrentLinkedDeque<>();
-
-  public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount,
-      final int numQueues, final int maxQueueLength, final Configuration conf,
-      final Abortable abortable) {
-    super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class,
-        maxQueueLength);
-  }
-
-  @Override
-  protected Handler getHandler(String name, double handlerFailureThreshhold,
-      BlockingQueue<CallRunner> q) {
-    return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack);
-  }
-
-  @Override
-  public boolean dispatch(CallRunner callTask) throws InterruptedException {
-    FastPathHandler handler = popReadyHandler();
-    return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
-  }
-
-  /**
-   * @return Pop a Handler instance if one available ready-to-go or else return null.
-   */
-  private FastPathHandler popReadyHandler() {
-    return this.fastPathHandlerStack.poll();
-  }
-
-  class FastPathHandler extends Handler {
-    // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack
Deque
-    // if an empty queue of CallRunners so we are available for direct handoff when one comes
in.
-    final Deque<FastPathHandler> fastPathHandlerStack;
-    // Semaphore to coordinate loading of fastpathed loadedTask and our running it.
-    private Semaphore semaphore = new Semaphore(0);
-    // The task we get when fast-pathing.
-    private CallRunner loadedCallRunner;
-
-    FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue<CallRunner>
q,
-        final Deque<FastPathHandler> fastPathHandlerStack) {
-      super(name, handlerFailureThreshhold, q);
-      this.fastPathHandlerStack = fastPathHandlerStack;
-    }
-
-    protected CallRunner getCallRunner() throws InterruptedException {
-      // Get a callrunner if one in the Q.
-      CallRunner cr = this.q.poll();
-      if (cr == null) {
-        // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves
for
-        // the fastpath handoff done via fastPathHandlerStack.
-        if (this.fastPathHandlerStack != null) {
-          this.fastPathHandlerStack.push(this);
-          this.semaphore.acquire();
-          cr = this.loadedCallRunner;
-          this.loadedCallRunner = null;
-        } else {
-          // No fastpath available. Block until a task comes available.
-          cr = super.getCallRunner();
-        }
-      }
-      return cr;
-    }
-
-    /**
-     * @param task Task gotten via fastpath.
-     * @return True if we successfully loaded our task
-     */
-    boolean loadCallRunner(final CallRunner cr) {
-      this.loadedCallRunner = cr;
-      this.semaphore.release();
-      return true;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc4b8aa8/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 6e623a6..e78b928 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -75,7 +75,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
   public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
     "hbase.ipc.server.callqueue.codel.lifo.threshold";
 
-  public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5;
+  public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100;
   public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
   public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
 
@@ -215,7 +215,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
           AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
       } else {
-        // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
+        // FifoWFPBQ = FastPathBalancedQueueRpcExecutor
         callExecutor = new RWQueueRpcExecutor("FifoRWQ.default", handlerCount, numCallQueues,
           callqReadShare, callqScanShare, maxQueueLength, conf, abortable);
       }
@@ -228,22 +228,22 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
             conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority);
       } else if (isCodelQueueType(callQueueType)) {
         callExecutor =
-          new BalancedQueueRpcExecutor("CodelBQ.default", handlerCount, numCallQueues,
+          new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues,
             conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength,
             codelTargetDelay, codelInterval, codelLifoThreshold,
             numGeneralCallsDropped, numLifoModeSwitches);
       } else {
-        // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor
-        callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
+        // FifoWFPBQ = FastPathBalancedQueueRpcExecutor
+        callExecutor = new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default",
             handlerCount, numCallQueues, maxQueueLength, conf, abortable);
       }
     }
     // Create 2 queues to help priorityExecutor be more scalable.
     this.priorityExecutor = priorityHandlerCount > 0?
-      new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
+      new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount,
          2, maxPriorityQueueLength, conf, abortable): null;
     this.replicationExecutor = replicationHandlerCount > 0?
-      new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
+      new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication",
         replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/fc4b8aa8/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 0e50761..0190027 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -40,6 +40,7 @@ import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadLocalRandom;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -77,8 +78,11 @@ import com.google.protobuf.Message;
 
 @Category({RPCTests.class, SmallTests.class})
 public class TestSimpleRpcScheduler {
-  @Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
-      withLookingForStuckThread(true).build();
+  @Rule
+  public final TestRule timeout =
+      CategoryBasedTimeout.builder().withTimeout(this.getClass()).
+          withLookingForStuckThread(true).build();
+
   private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
 
   private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
@@ -404,7 +408,8 @@ public class TestSimpleRpcScheduler {
     @Override
     public long currentTime() {
       for (String threadNamePrefix : threadNamePrefixs) {
-        if (Thread.currentThread().getName().startsWith(threadNamePrefix)) {
+        String threadName = Thread.currentThread().getName();
+        if (threadName.startsWith(threadNamePrefix)) {
           return timeQ.poll().longValue() + offset;
         }
       }
@@ -415,9 +420,9 @@ public class TestSimpleRpcScheduler {
   @Test
   public void testCoDelScheduling() throws Exception {
     CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
-    envEdge.threadNamePrefixs.add("RW.default");
-    envEdge.threadNamePrefixs.add("B.default");
+    envEdge.threadNamePrefixs.add("RpcServer.CodelBQ.default.handler");
     Configuration schedConf = HBaseConfiguration.create();
+    schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
 
     schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
       SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE);
@@ -435,8 +440,7 @@ public class TestSimpleRpcScheduler {
       for (int i = 0; i < 100; i++) {
         long time = System.currentTimeMillis();
         envEdge.timeQ.put(time);
-        CallRunner cr = getMockedCallRunner(time);
-        Thread.sleep(5);
+        CallRunner cr = getMockedCallRunner(time, 2);
         scheduler.dispatch(cr);
       }
       // make sure fast calls are handled
@@ -445,13 +449,12 @@ public class TestSimpleRpcScheduler {
       assertEquals("None of these calls should have been discarded", 0,
         scheduler.getNumGeneralCallsDropped());
 
-      envEdge.offset = 6;
+      envEdge.offset = 151;
       // calls slower than min delay, but not individually slow enough to be dropped
       for (int i = 0; i < 20; i++) {
         long time = System.currentTimeMillis();
         envEdge.timeQ.put(time);
-        CallRunner cr = getMockedCallRunner(time);
-        Thread.sleep(6);
+        CallRunner cr = getMockedCallRunner(time, 2);
         scheduler.dispatch(cr);
       }
 
@@ -461,35 +464,58 @@ public class TestSimpleRpcScheduler {
       assertEquals("None of these calls should have been discarded", 0,
         scheduler.getNumGeneralCallsDropped());
 
-      envEdge.offset = 12;
+      envEdge.offset = 2000;
       // now slow calls and the ones to be dropped
-      for (int i = 0; i < 20; i++) {
+      for (int i = 0; i < 60; i++) {
         long time = System.currentTimeMillis();
         envEdge.timeQ.put(time);
-        CallRunner cr = getMockedCallRunner(time);
-        Thread.sleep(12);
+        CallRunner cr = getMockedCallRunner(time, 100);
         scheduler.dispatch(cr);
       }
 
       // make sure somewhat slow calls are handled
       waitUntilQueueEmpty(scheduler);
       Thread.sleep(100);
-      assertTrue("There should have been at least 12 calls dropped",
-        scheduler.getNumGeneralCallsDropped() > 12);
+      assertTrue(
+          "There should have been at least 12 calls dropped however there were "
+              + scheduler.getNumGeneralCallsDropped(),
+          scheduler.getNumGeneralCallsDropped() > 12);
     } finally {
       scheduler.stop();
     }
   }
 
-  private CallRunner getMockedCallRunner(long timestamp) throws IOException {
-    CallRunner putCallTask = mock(CallRunner.class);
-    RpcServer.Call putCall = mock(RpcServer.Call.class);
+  // Get mocked call that has the CallRunner sleep for a while so that the fast
+  // path isn't hit.
+  private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException
{
+    final RpcServer.Call putCall = mock(RpcServer.Call.class);
+
+    putCall.timestamp = timestamp;
     putCall.param = RequestConverter.buildMutateRequest(
-      Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
-    RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
-    when(putCallTask.getCall()).thenReturn(putCall);
+        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+
+    RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder()
+                                                             .setMethodName("mutate")
+                                                             .build();
+    when(putCall.getSize()).thenReturn(9L);
     when(putCall.getHeader()).thenReturn(putHead);
-    putCall.timestamp = timestamp;
-    return putCallTask;
+
+    CallRunner cr = new CallRunner(null, putCall) {
+      public void run() {
+        try {
+          LOG.warn("Sleeping for " + sleepTime);
+          Thread.sleep(sleepTime);
+          LOG.warn("Done Sleeping for " + sleepTime);
+        } catch (InterruptedException e) {
+        }
+      }
+      public Call getCall() {
+        return putCall;
+      }
+
+      public void drop() {}
+    };
+
+    return cr;
   }
 }


Mime
View raw message