Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2B1F6200B2A for ; Sat, 25 Jun 2016 22:24:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1EC5B160A66; Sat, 25 Jun 2016 20:24:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BA3E5160A49 for ; Sat, 25 Jun 2016 22:24:33 +0200 (CEST) Received: (qmail 62311 invoked by uid 500); 25 Jun 2016 20:24:32 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 62302 invoked by uid 99); 25 Jun 2016 20:24:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 25 Jun 2016 20:24:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 84C98E094C; Sat, 25 Jun 2016 20:24:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: eclark@apache.org To: commits@hbase.apache.org Message-Id: <352c40a6b81e4321ad28739087dfbc9d@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-16089 Add on FastPath for CoDel Date: Sat, 25 Jun 2016 20:24:32 +0000 (UTC) archived-at: Sat, 25 Jun 2016 20:24:35 -0000 Repository: hbase Updated Branches: refs/heads/branch-1.3 e7bfd9bfe -> 65cbb6c2e HBASE-16089 Add on FastPath for CoDel Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/65cbb6c2 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/65cbb6c2 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/65cbb6c2 Branch: refs/heads/branch-1.3 Commit: 65cbb6c2e7335596c6edf8c0243bcd66722641bb Parents: e7bfd9b Author: Elliott Clark Authored: Wed Jun 22 16:34:40 2016 -0700 Committer: Elliott Clark Committed: Sat Jun 25 12:59:37 2016 -0700 ---------------------------------------------------------------------- .../hbase/ipc/AdaptiveLifoCoDelCallQueue.java | 59 ++++++--- .../org/apache/hadoop/hbase/ipc/CallRunner.java | 4 +- .../ipc/FastPathBalancedQueueRpcExecutor.java | 126 +++++++++++++++++++ ...ifoWithFastPathBalancedQueueRpcExecutor.java | 116 ----------------- .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 12 +- .../hbase/ipc/TestSimpleRpcScheduler.java | 67 ++++++---- 6 files changed, 221 insertions(+), 163 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java index 08c488b..42b500f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java @@ -73,7 +73,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { private AtomicBoolean resetDelay = new AtomicBoolean(true); // if we're in this mode, "long" calls are getting dropped - private volatile boolean isOverloaded; + private AtomicBoolean isOverloaded = new AtomicBoolean(false); public AdaptiveLifoCoDelCallQueue(int capacity, int targetDelay, int interval, double lifoThreshold, AtomicLong numGeneralCallsDropped, AtomicLong numLifoModeSwitches) { @@ -126,6 +126,34 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { } } + @Override + public CallRunner poll() { + CallRunner cr; + boolean switched = false; + while(true) { + if (((double) queue.size() / this.maxCapacity) > lifoThreshold) { + // Only count once per switch. + if (!switched) { + switched = true; + numLifoModeSwitches.incrementAndGet(); + } + cr = queue.pollLast(); + } else { + switched = false; + cr = queue.pollFirst(); + } + if (cr == null) { + return cr; + } + if (needToDrop(cr)) { + numGeneralCallsDropped.incrementAndGet(); + cr.drop(); + } else { + return cr; + } + } + } + /** * @param callRunner to validate * @return true if this call needs to be skipped based on call timestamp @@ -136,28 +164,28 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { long callDelay = now - callRunner.getCall().timestamp; long localMinDelay = this.minDelay; - if (now > intervalTime && !resetDelay.getAndSet(true)) { + + // Try and determine if we should reset + // the delay time and determine overload + if (now > intervalTime && + !resetDelay.get() && + !resetDelay.getAndSet(true)) { intervalTime = now + codelInterval; - if (localMinDelay > codelTargetDelay) { - isOverloaded = true; - } else { - isOverloaded = false; - } + isOverloaded.set(localMinDelay > codelTargetDelay); } - if (resetDelay.getAndSet(false)) { + // If it looks like we should reset the delay + // time do it only once on one thread + if (resetDelay.get() && resetDelay.getAndSet(false)) { minDelay = callDelay; + // we just reset the delay dunno about how this will work return false; } else if (callDelay < localMinDelay) { minDelay = callDelay; } - if (isOverloaded && callDelay > 2 * codelTargetDelay) { - return true; - } else { - return false; - } + return isOverloaded.get() && callDelay > 2 * codelTargetDelay; } // Generic BlockingQueue methods we support @@ -185,11 +213,6 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { + " but take() and offer() methods"); } - @Override - public CallRunner poll() { - throw new UnsupportedOperationException("This class doesn't support anything," - + " but take() and offer() methods"); - } @Override public CallRunner peek() { http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java index 00e08c9..e91699a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java @@ -64,7 +64,9 @@ public class CallRunner { this.call = call; this.rpcServer = rpcServer; // Add size of the call to queue size. - this.rpcServer.addCallSize(call.getSize()); + if (call != null && rpcServer != null) { + this.rpcServer.addCallSize(call.getSize()); + } } public Call getCall() { http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java new file mode 100644 index 0000000..4e06f4f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -0,0 +1,126 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.ipc; + +import java.util.Deque; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * Balanced queue executor with a fastpath. Because this is FIFO, it has no respect for + * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible. + * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling + * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See + * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h + */ +@InterfaceAudience.Private +public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { + // Depends on default behavior of BalancedQueueRpcExecutor being FIFO! + + /* + * Stack of Handlers waiting for work. + */ + private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); + + public FastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, + final int numQueues, final int maxQueueLength, final Configuration conf, + final Abortable abortable) { + super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, + maxQueueLength); + } + + public FastPathBalancedQueueRpcExecutor(String name, int handlerCount, + int numCallQueues, + Configuration conf, + Abortable abortable, + Class queueClass, + Object... args) { + super(name, handlerCount, numCallQueues, conf, abortable, queueClass, args); + } + + @Override + protected Handler getHandler(String name, double handlerFailureThreshhold, + BlockingQueue q) { + return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack); + } + + @Override + public boolean dispatch(CallRunner callTask) throws InterruptedException { + FastPathHandler handler = popReadyHandler(); + return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); + } + + /** + * @return Pop a Handler instance if one available ready-to-go or else return null. + */ + private FastPathHandler popReadyHandler() { + return this.fastPathHandlerStack.poll(); + } + + class FastPathHandler extends Handler { + // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque + // if an empty queue of CallRunners so we are available for direct handoff when one comes in. + final Deque fastPathHandlerStack; + // Semaphore to coordinate loading of fastpathed loadedTask and our running it. + private Semaphore semaphore = new Semaphore(0); + // The task we get when fast-pathing. + private CallRunner loadedCallRunner; + + FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue q, + final Deque fastPathHandlerStack) { + super(name, handlerFailureThreshhold, q); + this.fastPathHandlerStack = fastPathHandlerStack; + } + + protected CallRunner getCallRunner() throws InterruptedException { + // Get a callrunner if one in the Q. + CallRunner cr = this.q.poll(); + if (cr == null) { + // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for + // the fastpath handoff done via fastPathHandlerStack. + if (this.fastPathHandlerStack != null) { + this.fastPathHandlerStack.push(this); + this.semaphore.acquire(); + cr = this.loadedCallRunner; + this.loadedCallRunner = null; + } else { + // No fastpath available. Block until a task comes available. + cr = super.getCallRunner(); + } + } + return cr; + } + + /** + * @param task Task gotten via fastpath. + * @return True if we successfully loaded our task + */ + boolean loadCallRunner(final CallRunner cr) { + this.loadedCallRunner = cr; + this.semaphore.release(); + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java deleted file mode 100644 index 1a362bc..0000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoWithFastPathBalancedQueueRpcExecutor.java +++ /dev/null @@ -1,116 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.ipc; - -import java.util.Deque; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.Semaphore; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.classification.InterfaceAudience; - -/** - * FIFO balanced queue executor with a fastpath. Because this is FIFO, it has no respect for - * ordering so a fast path skipping the queuing of Calls if an Handler is available, is possible. - * Just pass the Call direct to waiting Handler thread. Try to keep the hot Handlers bubbling - * rather than let them go cold and lose context. Idea taken from Apace Kudu (incubating). See - * https://gerrit.cloudera.org/#/c/2938/7/src/kudu/rpc/service_queue.h - */ -@InterfaceAudience.Private -public class FifoWithFastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { - // Depends on default behavior of BalancedQueueRpcExecutor being FIFO! - - /* - * Stack of Handlers waiting for work. - */ - private final Deque fastPathHandlerStack = new ConcurrentLinkedDeque<>(); - - public FifoWithFastPathBalancedQueueRpcExecutor(final String name, final int handlerCount, - final int numQueues, final int maxQueueLength, final Configuration conf, - final Abortable abortable) { - super(name, handlerCount, numQueues, conf, abortable, LinkedBlockingQueue.class, - maxQueueLength); - } - - @Override - protected Handler getHandler(String name, double handlerFailureThreshhold, - BlockingQueue q) { - return new FastPathHandler(name, handlerFailureThreshhold, q, fastPathHandlerStack); - } - - @Override - public boolean dispatch(CallRunner callTask) throws InterruptedException { - FastPathHandler handler = popReadyHandler(); - return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); - } - - /** - * @return Pop a Handler instance if one available ready-to-go or else return null. - */ - private FastPathHandler popReadyHandler() { - return this.fastPathHandlerStack.poll(); - } - - class FastPathHandler extends Handler { - // Below are for fast-path support. Push this Handler on to the fastPathHandlerStack Deque - // if an empty queue of CallRunners so we are available for direct handoff when one comes in. - final Deque fastPathHandlerStack; - // Semaphore to coordinate loading of fastpathed loadedTask and our running it. - private Semaphore semaphore = new Semaphore(0); - // The task we get when fast-pathing. - private CallRunner loadedCallRunner; - - FastPathHandler(String name, double handlerFailureThreshhold, BlockingQueue q, - final Deque fastPathHandlerStack) { - super(name, handlerFailureThreshhold, q); - this.fastPathHandlerStack = fastPathHandlerStack; - } - - protected CallRunner getCallRunner() throws InterruptedException { - // Get a callrunner if one in the Q. - CallRunner cr = this.q.poll(); - if (cr == null) { - // Else, if a fastPathHandlerStack present and no callrunner in Q, register ourselves for - // the fastpath handoff done via fastPathHandlerStack. - if (this.fastPathHandlerStack != null) { - this.fastPathHandlerStack.push(this); - this.semaphore.acquire(); - cr = this.loadedCallRunner; - this.loadedCallRunner = null; - } else { - // No fastpath available. Block until a task comes available. - cr = super.getCallRunner(); - } - } - return cr; - } - - /** - * @param task Task gotten via fastpath. - * @return True if we successfully loaded our task - */ - boolean loadCallRunner(final CallRunner cr) { - this.loadedCallRunner = cr; - this.semaphore.release(); - return true; - } - } -} http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 5e68dc8..7574965 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -72,7 +72,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold"; - public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 5; + public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100; public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100; public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8; @@ -226,23 +226,23 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs conf, abortable, BoundedPriorityBlockingQueue.class, maxQueueLength, callPriority); } else if (isCodelQueueType(callQueueType)) { callExecutor = - new BalancedQueueRpcExecutor("BQCodel.default", handlerCount, numCallQueues, + new FastPathBalancedQueueRpcExecutor("CodelFPBQ.default", handlerCount, numCallQueues, conf, abortable, AdaptiveLifoCoDelCallQueue.class, maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches); } else { - // FifoWFPBQ = FifoWithFastPathBalancedQueueRpcExecutor - callExecutor = new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.default", + // FifoWFPBQ = FastPathBalancedQueueRpcExecutor + callExecutor = new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.default", handlerCount, numCallQueues, maxQueueLength, conf, abortable); } } // Create 2 queues to help priorityExecutor be more scalable. this.priorityExecutor = priorityHandlerCount > 0? - new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount, + new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.priority", priorityHandlerCount, 2, maxPriorityQueueLength, conf, abortable): null; this.replicationExecutor = replicationHandlerCount > 0? - new FifoWithFastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication", + new FastPathBalancedQueueRpcExecutor("FifoWFPBQ.replication", replicationHandlerCount, 1, maxQueueLength, conf, abortable) : null; } http://git-wip-us.apache.org/repos/asf/hbase/blob/65cbb6c2/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 1fda747..53addf1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -40,6 +40,7 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -398,7 +399,8 @@ public class TestSimpleRpcScheduler { @Override public long currentTime() { for (String threadNamePrefix : threadNamePrefixs) { - if (Thread.currentThread().getName().startsWith(threadNamePrefix)) { + String threadName = Thread.currentThread().getName(); + if (threadName.startsWith(threadNamePrefix)) { return timeQ.poll().longValue() + offset; } } @@ -409,9 +411,9 @@ public class TestSimpleRpcScheduler { @Test public void testCoDelScheduling() throws Exception { CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); - envEdge.threadNamePrefixs.add("RW.default"); - envEdge.threadNamePrefixs.add("B.default"); + envEdge.threadNamePrefixs.add("RpcServer.CodelBQ.default.handler"); Configuration schedConf = HBaseConfiguration.create(); + schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250); schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, SimpleRpcScheduler.CALL_QUEUE_TYPE_CODEL_CONF_VALUE); @@ -429,8 +431,7 @@ public class TestSimpleRpcScheduler { for (int i = 0; i < 100; i++) { long time = System.currentTimeMillis(); envEdge.timeQ.put(time); - CallRunner cr = getMockedCallRunner(time); - Thread.sleep(5); + CallRunner cr = getMockedCallRunner(time, 2); scheduler.dispatch(cr); } // make sure fast calls are handled @@ -439,13 +440,12 @@ public class TestSimpleRpcScheduler { assertEquals("None of these calls should have been discarded", 0, scheduler.getNumGeneralCallsDropped()); - envEdge.offset = 6; + envEdge.offset = 151; // calls slower than min delay, but not individually slow enough to be dropped for (int i = 0; i < 20; i++) { long time = System.currentTimeMillis(); envEdge.timeQ.put(time); - CallRunner cr = getMockedCallRunner(time); - Thread.sleep(6); + CallRunner cr = getMockedCallRunner(time, 2); scheduler.dispatch(cr); } @@ -455,35 +455,58 @@ public class TestSimpleRpcScheduler { assertEquals("None of these calls should have been discarded", 0, scheduler.getNumGeneralCallsDropped()); - envEdge.offset = 12; + envEdge.offset = 2000; // now slow calls and the ones to be dropped - for (int i = 0; i < 20; i++) { + for (int i = 0; i < 60; i++) { long time = System.currentTimeMillis(); envEdge.timeQ.put(time); - CallRunner cr = getMockedCallRunner(time); - Thread.sleep(12); + CallRunner cr = getMockedCallRunner(time, 100); scheduler.dispatch(cr); } // make sure somewhat slow calls are handled waitUntilQueueEmpty(scheduler); Thread.sleep(100); - assertTrue("There should have been at least 12 calls dropped", - scheduler.getNumGeneralCallsDropped() > 12); + assertTrue( + "There should have been at least 12 calls dropped however there were " + + scheduler.getNumGeneralCallsDropped(), + scheduler.getNumGeneralCallsDropped() > 12); } finally { scheduler.stop(); } } - private CallRunner getMockedCallRunner(long timestamp) throws IOException { - CallRunner putCallTask = mock(CallRunner.class); - RpcServer.Call putCall = mock(RpcServer.Call.class); + // Get mocked call that has the CallRunner sleep for a while so that the fast + // path isn't hit. + private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { + final RpcServer.Call putCall = mock(RpcServer.Call.class); + + putCall.timestamp = timestamp; putCall.param = RequestConverter.buildMutateRequest( - Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); - RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(); - when(putCallTask.getCall()).thenReturn(putCall); + Bytes.toBytes("abc"), new Put(Bytes.toBytes("row"))); + + RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder() + .setMethodName("mutate") + .build(); + when(putCall.getSize()).thenReturn(9L); when(putCall.getHeader()).thenReturn(putHead); - putCall.timestamp = timestamp; - return putCallTask; + + CallRunner cr = new CallRunner(null, putCall) { + public void run() { + try { + LOG.warn("Sleeping for " + sleepTime); + Thread.sleep(sleepTime); + LOG.warn("Done Sleeping for " + sleepTime); + } catch (InterruptedException e) { + } + } + public Call getCall() { + return putCall; + } + + public void drop() {} + }; + + return cr; } }