hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mberto...@apache.org
Subject [46/50] [abbrv] hbase git commit: HBASE-13700 Change Thrift2 so that number of woker threads can be passed in
Date Thu, 21 May 2015 16:43:29 GMT
HBASE-13700 Change Thrift2 so that number of woker threads can be passed in


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/3a1e101d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/3a1e101d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/3a1e101d

Branch: refs/heads/hbase-12439
Commit: 3a1e101dcd4cf0e7149c0d8b1c13f96adf04278e
Parents: 1bfe387
Author: Elliott Clark <eclark@apache.org>
Authored: Fri May 15 16:30:57 2015 -0700
Committer: Elliott Clark <eclark@apache.org>
Committed: Wed May 20 13:43:46 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/thrift2/ThriftServer.java      | 43 +++++++++++++++++---
 1 file changed, 37 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/3a1e101d/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index 72e9117..66ecc18 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -127,6 +127,7 @@ public class ThriftServer {
     options.addOption("p", "port", true, "Port to bind to [default: " + DEFAULT_LISTEN_PORT
+ "]");
     options.addOption("f", "framed", false, "Use framed transport");
     options.addOption("c", "compact", false, "Use the compact protocol");
+    options.addOption("w", "workers", true, "How many worker threads to use.");
     options.addOption("h", "help", false, "Print help information");
     options.addOption(null, "infoport", true, "Port for web UI");
 
@@ -233,11 +234,15 @@ public class ThriftServer {
 
   private static TServer getTHsHaServer(TProtocolFactory protocolFactory,
       TProcessor processor, TTransportFactory transportFactory,
+      int workerThreads,
       InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
       throws TTransportException {
     TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
     log.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
     THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
+    if (workerThreads > 0) {
+      serverArgs.workerThreads(workerThreads);
+    }
     ExecutorService executorService = createExecutor(
         serverArgs.getWorkerThreads(), metrics);
     serverArgs.executorService(executorService);
@@ -254,18 +259,27 @@ public class ThriftServer {
     ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
     tfb.setDaemon(true);
     tfb.setNameFormat("thrift2-worker-%d");
-    return new ThreadPoolExecutor(workerThreads, workerThreads,
+    ThreadPoolExecutor pool = new ThreadPoolExecutor(workerThreads, workerThreads,
             Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build());
+    pool.prestartAllCoreThreads();
+    return pool;
   }
 
-  private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor
processor,
-      TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws TTransportException
{
+  private static TServer getTThreadPoolServer(TProtocolFactory protocolFactory,
+                                              TProcessor processor,
+                                              TTransportFactory transportFactory,
+                                              int workerThreads,
+                                              InetSocketAddress inetSocketAddress)
+      throws TTransportException {
     TServerTransport serverTransport = new TServerSocket(inetSocketAddress);
     log.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
     TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
     serverArgs.processor(processor);
     serverArgs.transportFactory(transportFactory);
     serverArgs.protocolFactory(protocolFactory);
+    if (workerThreads > 0) {
+      serverArgs.maxWorkerThreads(workerThreads);
+    }
     return new TThreadPoolServer(serverArgs);
   }
 
@@ -298,6 +312,7 @@ public class ThriftServer {
     Options options = getOptions();
     Configuration conf = HBaseConfiguration.create();
     CommandLine cmd = parseArguments(conf, options, args);
+    int workerThreads = 0;
 
     /**
      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start"
and "stop" arguments hbase
@@ -414,6 +429,10 @@ public class ThriftServer {
       };
     }
 
+    if (cmd.hasOption("w")) {
+      workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
+    }
+
     // check for user-defined info server port setting, if so override the conf
     try {
       if (cmd.hasOption("infoport")) {
@@ -438,11 +457,23 @@ public class ThriftServer {
     }
 
     if (nonblocking) {
-      server = getTNonBlockingServer(protocolFactory, processor, transportFactory, inetSocketAddress);
+      server = getTNonBlockingServer(protocolFactory,
+          processor,
+          transportFactory,
+          inetSocketAddress);
     } else if (hsha) {
-      server = getTHsHaServer(protocolFactory, processor, transportFactory, inetSocketAddress,
metrics);
+      server = getTHsHaServer(protocolFactory,
+          processor,
+          transportFactory,
+          workerThreads,
+          inetSocketAddress,
+          metrics);
     } else {
-      server = getTThreadPoolServer(protocolFactory, processor, transportFactory, inetSocketAddress);
+      server = getTThreadPoolServer(protocolFactory,
+          processor,
+          transportFactory,
+          workerThreads,
+          inetSocketAddress);
     }
 
     final TServer tserver = server;


Mime
View raw message