hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From anto...@apache.org
Subject hbase git commit: HBASE-15637 TSHA Thrift-2 server should allow limiting call queue size
Date Wed, 13 Apr 2016 19:32:13 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 8f9e46a64 -> 0bb18de91


HBASE-15637 TSHA Thrift-2 server should allow limiting call queue size


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0bb18de9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0bb18de9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0bb18de9

Branch: refs/heads/master
Commit: 0bb18de91c69ec43dc5118e59035686c586f3372
Parents: 8f9e46a
Author: Mikhail Antonov <antonov@apache.org>
Authored: Tue Apr 12 16:39:30 2016 -0700
Committer: Mikhail Antonov <antonov@apache.org>
Committed: Wed Apr 13 12:31:46 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/thrift2/ThriftServer.java      | 23 +++++++++++++++-----
 1 file changed, 18 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0bb18de9/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index 695c74b..b606500 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -143,6 +143,8 @@ public class ThriftServer extends Configured implements Tool {
     options.addOption("f", "framed", false, "Use framed transport");
     options.addOption("c", "compact", false, "Use the compact protocol");
     options.addOption("w", "workers", true, "How many worker threads to use.");
+    options.addOption("q", "callQueueSize", true,
+      "Max size of request queue (unbounded by default)");
     options.addOption("h", "help", false, "Print help information");
     options.addOption(null, "infoport", true, "Port for web UI");
     options.addOption("t", READ_TIMEOUT_OPTION, true,
@@ -251,7 +253,7 @@ public class ThriftServer extends Configured implements Tool {
 
   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
       TProcessor processor, TTransportFactory transportFactory,
-      int workerThreads,
+      int workerThreads, int maxCallQueueSize,
       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
       throws TTransportException {
     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
@@ -262,7 +264,7 @@ public class ThriftServer extends Configured implements Tool {
       serverArgs.minWorkerThreads(workerThreads).maxWorkerThreads(workerThreads);
     }
     ExecutorService executorService = createExecutor(
-        workerThreads, metrics);
+        workerThreads, maxCallQueueSize, metrics);
     serverArgs.executorService(executorService);
     serverArgs.processor(processor);
     serverArgs.transportFactory(transportFactory);
@@ -271,9 +273,14 @@ public class ThriftServer extends Configured implements Tool {
   }
 
   private static ExecutorService createExecutor(
-      int workerThreads, ThriftMetrics metrics) {
-    CallQueue callQueue = new CallQueue(
-        new LinkedBlockingQueue<Call>(), metrics);
+      int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
+    CallQueue callQueue;
+    if (maxCallQueueSize > 0) {
+      callQueue = new CallQueue(new LinkedBlockingQueue<Call>(maxCallQueueSize), metrics);
+    } else {
+      callQueue = new CallQueue(new LinkedBlockingQueue<Call>(), metrics);
+    }
+
     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
     tfb.setDaemon(true);
     tfb.setNameFormat("thrift2-worker-%d");
@@ -342,6 +349,7 @@ public class ThriftServer extends Configured implements Tool {
     Options options = getOptions();
     CommandLine cmd = parseArguments(conf, options, args);
     int workerThreads = 0;
+    int maxCallQueueSize = -1; // use unbounded queue by default
 
     /**
      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start"
and "stop" arguments hbase
@@ -475,6 +483,10 @@ public class ThriftServer extends Configured implements Tool {
       workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
     }
 
+    if (cmd.hasOption("q")) {
+      maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
+    }
+
     // check for user-defined info server port setting, if so override the conf
     try {
       if (cmd.hasOption("infoport")) {
@@ -508,6 +520,7 @@ public class ThriftServer extends Configured implements Tool {
           processor,
           transportFactory,
           workerThreads,
+          maxCallQueueSize,
           inetSocketAddress,
           metrics);
     } else {


Mime
View raw message