hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject [10/21] hbase git commit: HBASE-16290 Dump summary of callQueue content; can help debugging
Date Wed, 25 Oct 2017 12:28:32 GMT
HBASE-16290 Dump summary of callQueue content; can help debugging

Signed-off-by: Chia-Ping Tsai <chia7712@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7cdfbde3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7cdfbde3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7cdfbde3

Branch: refs/heads/HBASE-18410
Commit: 7cdfbde37dae11261b4be12cc086058826a8037a
Parents: 3969b85
Author: Sreeram Venkatasubramanian <sreeram_v@infosys.com>
Authored: Tue Oct 24 14:03:32 2017 +0800
Committer: Chia-Ping Tsai <chia7712@gmail.com>
Committed: Tue Oct 24 14:28:42 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/ipc/CallQueueInfo.java  |  81 ++++++++++
 .../hadoop/hbase/ipc/FifoRpcScheduler.java      |  57 ++++++-
 .../apache/hadoop/hbase/ipc/RpcExecutor.java    |  47 ++++++
 .../apache/hadoop/hbase/ipc/RpcScheduler.java   |   3 +
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java    |  28 ++++
 .../hbase/regionserver/RSDumpServlet.java       |  33 ++++
 .../hbase/ipc/DelegatingRpcScheduler.java       |   5 +
 .../hadoop/hbase/ipc/TestFifoRpcScheduler.java  | 160 +++++++++++++++++++
 .../hbase/ipc/TestSimpleRpcScheduler.java       |  68 ++++++++
 9 files changed, 481 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueInfo.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueInfo.java
new file mode 100644
index 0000000..19a75ea
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallQueueInfo.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+
+@InterfaceAudience.Private
+public class CallQueueInfo {
+  private final Map<String, Map<String, Long>> callQueueMethodCountsSummary;
+  private final Map<String, Map<String, Long>> callQueueMethodSizeSummary;
+
+  CallQueueInfo() {
+    callQueueMethodCountsSummary = new HashMap<>();
+    callQueueMethodSizeSummary = new HashMap<>();
+  }
+
+  public Set<String> getCallQueueNames() {
+    return callQueueMethodCountsSummary.keySet();
+  }
+
+  public Set<String> getCalledMethodNames(String callQueueName) {
+    return callQueueMethodCountsSummary.get(callQueueName).keySet();
+  }
+
+  public long getCallMethodCount(String callQueueName, String methodName) {
+    long methodCount;
+
+    Map<String, Long> methodCountMap = callQueueMethodCountsSummary.getOrDefault(callQueueName,
null);
+
+    if (null != methodCountMap) {
+      methodCount = methodCountMap.getOrDefault(methodName, 0L);
+    } else {
+      methodCount = 0L;
+    }
+
+    return methodCount;
+  }
+
+  void setCallMethodCount(String callQueueName, Map<String, Long> methodCountMap) {
+    callQueueMethodCountsSummary.put(callQueueName, methodCountMap);
+  }
+
+  public long getCallMethodSize(String callQueueName, String methodName) {
+    long methodSize;
+
+    Map<String, Long> methodSizeMap = callQueueMethodSizeSummary.getOrDefault(callQueueName,
null);
+
+    if (null != methodSizeMap) {
+      methodSize = methodSizeMap.getOrDefault(methodName, 0L);
+    } else {
+      methodSize = 0L;
+    }
+
+    return methodSize;
+  }
+
+  void setCallMethodSize(String callQueueName, Map<String, Long> methodSizeMap) {
+    callQueueMethodSizeSummary.put(callQueueName, methodSizeMap);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
index 4ebfcd9..679f237 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java
@@ -21,12 +21,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.DaemonThreadFactory;
+import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
 
 import java.io.IOException;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.HashMap;
 
 /**
  * A very simple {@code }RpcScheduler} that serves incoming requests in order.
@@ -70,6 +72,24 @@ public class FifoRpcScheduler extends RpcScheduler {
     this.executor.shutdown();
   }
 
+  private static class FifoCallRunner implements Runnable {
+    private final CallRunner callRunner;
+
+    FifoCallRunner(CallRunner cr) {
+      this.callRunner = cr;
+    }
+
+    CallRunner getCallRunner() {
+      return callRunner;
+    }
+
+    @Override
+    public void run() {
+      callRunner.run();
+    }
+
+  }
+
   @Override
   public boolean dispatch(final CallRunner task) throws IOException, InterruptedException
{
     // Executors provide no offer, so make our own.
@@ -78,7 +98,8 @@ public class FifoRpcScheduler extends RpcScheduler {
       queueSize.decrementAndGet();
       return false;
     }
-    executor.submit(new Runnable() {
+
+    executor.execute(new FifoCallRunner(task){
       @Override
       public void run() {
         task.setStatus(RpcServer.getStatus());
@@ -86,6 +107,7 @@ public class FifoRpcScheduler extends RpcScheduler {
         queueSize.decrementAndGet();
       }
     });
+
     return true;
   }
 
@@ -148,4 +170,37 @@ public class FifoRpcScheduler extends RpcScheduler {
   public int getActiveScanRpcHandlerCount() {
     return 0;
   }
+
+  @Override
+  public CallQueueInfo getCallQueueInfo() {
+    String queueName = "Fifo Queue";
+
+    HashMap<String, Long> methodCount = new HashMap<>();
+    HashMap<String, Long> methodSize = new HashMap<>();
+
+    CallQueueInfo callQueueInfo = new CallQueueInfo();
+    callQueueInfo.setCallMethodCount(queueName, methodCount);
+    callQueueInfo.setCallMethodSize(queueName, methodSize);
+
+
+    for (Runnable r:executor.getQueue()) {
+      FifoCallRunner mcr = (FifoCallRunner) r;
+      RpcCall rpcCall = mcr.getCallRunner().getRpcCall();
+
+      String method;
+
+      if (null==rpcCall.getMethod() ||
+            StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
+        method = "Unknown";
+      }
+
+      long size = rpcCall.getSize();
+
+      methodCount.put(method, 1 + methodCount.getOrDefault(method, 0L));
+      methodSize.put(method, size + methodSize.getOrDefault(method, 0L));
+    }
+
+    return callQueueInfo;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 5c017305..445a460 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -27,12 +27,15 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
+import java.util.Map;
+import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.shaded.io.netty.util.internal.StringUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
@@ -151,6 +154,50 @@ public abstract class RpcExecutor {
     return Math.max(1, (int) Math.round(handlerCount * callQueuesHandlersFactor));
   }
 
+  public Map<String, Long> getCallQueueCountsSummary() {
+    HashMap<String, Long> callQueueMethodTotalCount = new HashMap<>();
+
+    for(BlockingQueue<CallRunner> queue: queues) {
+      for (CallRunner cr:queue) {
+        RpcCall rpcCall = cr.getRpcCall();
+
+        String method;
+
+        if (null==rpcCall.getMethod() ||
+             StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
+          method = "Unknown";
+        }
+
+        callQueueMethodTotalCount.put(method, 1+callQueueMethodTotalCount.getOrDefault(method,
0L));
+      }
+    }
+
+    return callQueueMethodTotalCount;
+  }
+
+  public Map<String, Long> getCallQueueSizeSummary() {
+    HashMap<String, Long> callQueueMethodTotalSize = new HashMap<>();
+
+    for(BlockingQueue<CallRunner> queue: queues) {
+      for (CallRunner cr:queue) {
+        RpcCall rpcCall = cr.getRpcCall();
+        String method;
+
+        if (null==rpcCall.getMethod() ||
+          StringUtil.isNullOrEmpty(method = rpcCall.getMethod().getName())) {
+          method = "Unknown";
+        }
+
+        long size = rpcCall.getSize();
+
+        callQueueMethodTotalSize.put(method, size+callQueueMethodTotalSize.getOrDefault(method,
0L));
+      }
+    }
+
+    return callQueueMethodTotalSize;
+  }
+
+
   protected void initializeQueues(final int numQueues) {
     if (queueInitArgs.length > 0) {
       currentQueueLimit = (int) queueInitArgs[0];

http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
index e060885..bd088a7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java
@@ -65,6 +65,9 @@ public abstract class RpcScheduler {
    */
   public abstract boolean dispatch(CallRunner task) throws IOException, InterruptedException;
 
+  /** Get call queue information **/
+  public abstract CallQueueInfo getCallQueueInfo();
+
   /** Retrieves length of the general queue for metrics. */
   public abstract int getGeneralQueueLength();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index f40959a..817163c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -229,5 +229,33 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
   public int getActiveScanRpcHandlerCount() {
     return callExecutor.getActiveScanHandlerCount();
   }
+
+  @Override
+  public CallQueueInfo getCallQueueInfo() {
+    String queueName;
+
+    CallQueueInfo callQueueInfo = new CallQueueInfo();
+
+    if(null!=callExecutor) {
+      queueName = "Call Queue";
+      callQueueInfo.setCallMethodCount(queueName, callExecutor.getCallQueueCountsSummary());
+      callQueueInfo.setCallMethodSize(queueName, callExecutor.getCallQueueSizeSummary());
+    }
+
+    if(null!=priorityExecutor) {
+      queueName = "Priority Queue";
+      callQueueInfo.setCallMethodCount(queueName, priorityExecutor.getCallQueueCountsSummary());
+      callQueueInfo.setCallMethodSize(queueName, priorityExecutor.getCallQueueSizeSummary());
+    }
+
+    if(null!=replicationExecutor) {
+      queueName = "Replication Queue";
+      callQueueInfo.setCallMethodCount(queueName, replicationExecutor.getCallQueueCountsSummary());
+      callQueueInfo.setCallMethodSize(queueName, replicationExecutor.getCallQueueSizeSummary());
+    }
+
+    return callQueueInfo;
+  }
+
 }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
index 8690b70..2852ecf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSDumpServlet.java
@@ -26,6 +26,7 @@ import java.util.Date;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
+import org.apache.hadoop.hbase.ipc.CallQueueInfo;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.monitoring.LogMonitoring;
@@ -100,6 +101,10 @@ public class RSDumpServlet extends StateDumpServlet {
         dumpQueue(hrs, out);
       }
 
+      out.println("\n\nCall Queue Summary:");
+      out.println(LINE);
+      dumpCallQueues(hrs, out);
+
       out.flush();
     }
   }
@@ -136,4 +141,32 @@ public class RSDumpServlet extends StateDumpServlet {
       out.println(hrs.cacheFlusher.dumpQueue());
     }
   }
+
+
+  public static void dumpCallQueues(HRegionServer hrs, PrintWriter out) {
+    CallQueueInfo callQueueInfo = hrs.rpcServices.rpcServer.getScheduler().getCallQueueInfo();
+
+    for(String queueName: callQueueInfo.getCallQueueNames()) {
+
+      out.println("\nQueue Name: " + queueName);
+
+      long totalCallCount = 0L, totalCallSize = 0L;
+      for (String methodName: callQueueInfo.getCalledMethodNames(queueName)) {
+        long thisMethodCount, thisMethodSize;
+        thisMethodCount = callQueueInfo.getCallMethodCount(queueName, methodName);
+        thisMethodSize = callQueueInfo.getCallMethodSize(queueName, methodName);
+
+        out.println("Method in call: "+methodName);
+        out.println("Total call count for method: "+thisMethodCount);
+        out.println("Total call size for method (bytes): "+thisMethodSize);
+
+        totalCallCount += thisMethodCount;
+        totalCallSize += thisMethodSize;
+      }
+      out.println("Total call count for queue: "+totalCallCount);
+      out.println("Total call size for queue (bytes): "+totalCallSize);
+    }
+
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
index eb27c7a..3dfae82 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java
@@ -103,4 +103,9 @@ public class DelegatingRpcScheduler extends RpcScheduler {
   public int getActiveScanRpcHandlerCount() {
     return 0;
   }
+
+  @Override
+  public CallQueueInfo getCallQueueInfo() {
+    return delegate.getCallQueueInfo();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestFifoRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestFifoRpcScheduler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestFifoRpcScheduler.java
new file mode 100644
index 0000000..8611e48
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestFifoRpcScheduler.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CategoryBasedTimeout;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RPCTests;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.InetSocketAddress;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+
+@Category({RPCTests.class, LargeTests.class})
+public class TestFifoRpcScheduler {
+  @Rule
+  public final TestRule timeout =
+      CategoryBasedTimeout.builder().withTimeout(this.getClass()).
+          withLookingForStuckThread(true).build();
+
+  private static final Log LOG = LogFactory.getLog(TestFifoRpcScheduler.class);
+
+  private AtomicInteger callExecutionCount;
+
+  private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
+    @Override
+    public InetSocketAddress getListenerAddress() {
+      return InetSocketAddress.createUnresolved("127.0.0.1", 1000);
+    }
+  };
+  private Configuration conf;
+
+  @Before
+  public void setUp() {
+    conf = HBaseConfiguration.create();
+    callExecutionCount = new AtomicInteger(0);
+  }
+
+  private ThreadPoolExecutor disableHandlers(RpcScheduler scheduler) {
+    ThreadPoolExecutor rpcExecutor=null;
+
+    try {
+      Field ExecutorField = scheduler.getClass().getDeclaredField("executor");
+      ExecutorField.setAccessible(true);
+
+      scheduler.start();
+      rpcExecutor = (ThreadPoolExecutor) ExecutorField.get(scheduler);
+
+      rpcExecutor.setMaximumPoolSize(1);
+      rpcExecutor.allowCoreThreadTimeOut(true);
+      rpcExecutor.setCorePoolSize(0);
+      rpcExecutor.setKeepAliveTime(1, TimeUnit.MICROSECONDS);
+
+      // Wait for 2 seconds, so that idle threads will die
+      Thread.sleep(2000);
+
+    } catch (NoSuchFieldException e) {
+      LOG.error("No such field exception:"+e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Illegal access exception:"+e);
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted exception:"+e);
+    }
+
+    return rpcExecutor;
+  }
+
+  @Test
+  public void testCallQueueInfo() throws IOException, InterruptedException {
+
+    ThreadPoolExecutor rpcExecutor;
+    RpcScheduler scheduler = new FifoRpcScheduler(
+            conf, 1);
+
+    scheduler.init(CONTEXT);
+
+    // Set number of handlers to a minimum value
+    disableHandlers(scheduler);
+
+    int totalCallMethods = 30;
+    int unableToDispatch = 0;
+
+    for (int i = totalCallMethods; i>0; i--) {
+      CallRunner task = createMockTask();
+      task.setStatus(new MonitoredRPCHandlerImpl());
+
+      if(!scheduler.dispatch(task)) {
+        unableToDispatch++;
+      }
+
+      Thread.sleep(10);
+    }
+
+
+    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
+    int executionCount = callExecutionCount.get();
+    int callQueueSize = 0;
+
+    for (String callQueueName:callQueueInfo.getCallQueueNames()) {
+      for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) {
+        callQueueSize += callQueueInfo.getCallMethodCount(callQueueName, calledMethod);
+      }
+    }
+
+    assertEquals(totalCallMethods - unableToDispatch, callQueueSize + executionCount);
+
+    scheduler.stop();
+  }
+
+  private CallRunner createMockTask() {
+    ServerCall call = mock(ServerCall.class);
+    CallRunner task = mock(CallRunner.class);
+    when(task.getRpcCall()).thenReturn(call);
+
+    doAnswer(new Answer<Void>() {
+      @Override public Void answer (InvocationOnMock invocation) throws Throwable {
+        callExecutionCount.incrementAndGet();
+        Thread.sleep(1000);
+        return null;
+      }
+    }).when(task).run();
+
+    return task;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/7cdfbde3/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 18ab73f..8364b22 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableSet;
 import org.apache.hadoop.hbase.shaded.com.google.common.collect.Maps;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -112,6 +113,73 @@ public class TestSimpleRpcScheduler {
     scheduler.stop();
   }
 
+  private RpcScheduler disableHandlers(RpcScheduler scheduler) {
+    try {
+      Field ExecutorField = scheduler.getClass().getDeclaredField("callExecutor");
+      ExecutorField.setAccessible(true);
+
+      RpcExecutor rpcExecutor = (RpcExecutor)ExecutorField.get(scheduler);
+
+      Field handlerCountField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("handlerCount");
+
+      handlerCountField.setAccessible(true);
+      handlerCountField.set(rpcExecutor, 0);
+
+      Field numCallQueuesField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("numCallQueues");
+
+      numCallQueuesField.setAccessible(true);
+      numCallQueuesField.set(rpcExecutor, 1);
+
+      Field currentQueueLimitField = rpcExecutor.getClass().getSuperclass().getSuperclass().getDeclaredField("currentQueueLimit");
+
+      currentQueueLimitField.setAccessible(true);
+      currentQueueLimitField.set(rpcExecutor, 100);
+
+    } catch (NoSuchFieldException e) {
+      LOG.error("No such field exception"+e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Illegal access exception"+e);
+    }
+
+    return scheduler;
+  }
+
+  @Test
+  public void testCallQueueInfo() throws IOException, InterruptedException {
+
+    PriorityFunction qosFunction = mock(PriorityFunction.class);
+    RpcScheduler scheduler = new SimpleRpcScheduler(
+            conf, 0, 0, 0, qosFunction, 0);
+
+    scheduler.init(CONTEXT);
+
+    // Set the handlers to zero. So that number of requests in call Queue can be tested
+    scheduler = disableHandlers(scheduler);
+    scheduler.start();
+
+    int totalCallMethods = 10;
+    for (int i = totalCallMethods; i>0; i--) {
+      CallRunner task = createMockTask();
+      task.setStatus(new MonitoredRPCHandlerImpl());
+      scheduler.dispatch(task);
+    }
+
+
+    CallQueueInfo callQueueInfo = scheduler.getCallQueueInfo();
+
+    for (String callQueueName:callQueueInfo.getCallQueueNames()) {
+
+      for (String calledMethod: callQueueInfo.getCalledMethodNames(callQueueName)) {
+        assertEquals(callQueueInfo.getCallMethodCount(callQueueName, calledMethod), totalCallMethods);
+      }
+
+    }
+
+    scheduler.stop();
+
+  }
+
+
   @Test
   public void testHandlerIsolation() throws IOException, InterruptedException {
 


Mime
View raw message