accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibe...@apache.org
Subject [accumulo] 01/09: fixes #1621: The ClientPool thread pool allows all core threads to time out * Added properties to allow overridding the allowCoreThreadTimeout in various threadpools: master incoming requests, tserver incoming requests, master bulk imports
Date Tue, 07 Jul 2020 14:57:17 GMT
This is an automated email from the ASF dual-hosted git repository.

ibella pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 352d2cd050411233148fd476531d93297b98d308
Author: Ivan Bella <ivan@bella.name>
AuthorDate: Thu Jun 4 18:17:27 2020 +0000

    fixes #1621: The ClientPool thread pool allows all core threads to time out
      * Added properties to allow overridding the allowCoreThreadTimeout in
        various threadpools: master incoming requests, tserver incoming requests, master bulk
imports
---
 .../core/clientImpl/TabletServerBatchReader.java   |   2 +-
 .../core/clientImpl/TabletServerBatchWriter.java   |   4 +-
 .../org/apache/accumulo/core/conf/Property.java    |   9 ++
 .../accumulo/core/util/SimpleThreadPool.java       |  13 ++-
 .../apache/accumulo/server/rpc/TServerUtils.java   | 121 ++++++++++++---------
 .../accumulo/server/util/TServerUtilsTest.java     |   3 +-
 .../apache/accumulo/gc/SimpleGarbageCollector.java |   2 +-
 .../java/org/apache/accumulo/master/Master.java    |   6 +-
 .../master/tableOps/bulkVer1/BulkImport.java       |   2 +-
 .../master/tableOps/bulkVer1/LoadFiles.java        |   6 +-
 .../master/tableOps/bulkVer2/BulkImportMove.java   |   2 +-
 .../tableOps/tableImport/MoveExportedFiles.java    |   2 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  16 +--
 .../org/apache/accumulo/tserver/log/LogSorter.java |   2 +-
 .../accumulo/tserver/log/TabletServerLogger.java   |   2 +-
 .../accumulo/test/BalanceWithOfflineTableIT.java   |   2 +-
 .../test/functional/BatchWriterFlushIT.java        |   2 +-
 .../accumulo/test/functional/ZombieTServer.java    |   2 +-
 .../accumulo/test/performance/NullTserver.java     |   2 +-
 19 files changed, 116 insertions(+), 84 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
index 37909b8..d89b376 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
@@ -70,7 +70,7 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan
     this.numThreads = numQueryThreads;
 
     queryThreadPool =
-        new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-");
+        new SimpleThreadPool(numQueryThreads, true, "batch scanner " + batchReaderInstance
+ "-");
     cleanable = CleanerUtil.unclosed(this, scopeClass, closed, log, queryThreadPool.asCloseable());
   }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index ff04bbf..1f02530 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -647,9 +647,9 @@ public class TabletServerBatchWriter implements AutoCloseable {
     public MutationWriter(int numSendThreads) {
       serversMutations = new HashMap<>();
       queued = new HashSet<>();
-      sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
+      sendThreadPool = new SimpleThreadPool(numSendThreads, true, this.getClass().getName());
       locators = new HashMap<>();
-      binningThreadPool = new SimpleThreadPool(1, "BinMutations", new SynchronousQueue<>());
+      binningThreadPool = new SimpleThreadPool(1, true, "BinMutations", new SynchronousQueue<>());
       binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
     }
 
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index c93402e..585f7be 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -248,6 +248,9 @@ public enum Property {
       "The number of attempts to bulk import a RFile before giving up."),
   MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT,
       "The number of threads to use when coordinating a bulk import."),
+  MASTER_BULK_THREADPOOL_ALLOW_TIMEOUT("master.bulk.thread.timeout.allowed", "true",
+      PropertyType.BOOLEAN,
+      "True if the bulk import threads are allowed to timeout with no work available."),
   MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION,
       "The time to wait for a tablet server to process a bulk import request"),
   MASTER_RENAME_THREADS("master.rename.threadpool.size", "20", PropertyType.COUNT,
@@ -261,6 +264,9 @@ public enum Property {
       "Regular expression that defines the set of Tablet Servers that will perform bulk imports"),
   MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT,
       "The minimum number of threads to use to handle incoming requests."),
+  MASTER_MINTHREADS_ALLOW_TIMEOUT("master.server.thread.timeout.allowed", "true",
+      PropertyType.BOOLEAN,
+      "True if the incoming request threads are allowed to timeout with no work available."),
   MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
       "The time between adjustments of the server thread pool."),
   MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
@@ -497,6 +503,9 @@ public enum Property {
       "The time to wait for a tablet server to process a bulk import request."),
   TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT,
       "The minimum number of threads to use to handle incoming requests."),
+  TSERV_MINTHREADS_ALLOW_TIMEOUT("tserver.server.thread.timeout.allowed", "true",
+      PropertyType.BOOLEAN,
+      "True if the incoming request threads are allowed to timeout with no work available."),
   TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
       "The time between adjustments of the server thread pool."),
   TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", PropertyType.BYTES,
diff --git a/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java b/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
index 1cc96ca..34f8359 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
@@ -28,15 +28,16 @@ import java.util.concurrent.TimeUnit;
  */
 public class SimpleThreadPool extends ThreadPoolExecutor {
 
-  public SimpleThreadPool(int max, final String name) {
-    super(max, max, 4L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+  public SimpleThreadPool(int coreAndMax, boolean allowCoreThreadTimeOut, final String name)
{
+    super(coreAndMax, coreAndMax, 4L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
         new NamingThreadFactory(name));
-    allowCoreThreadTimeOut(true);
+    allowCoreThreadTimeOut(allowCoreThreadTimeOut);
   }
 
-  public SimpleThreadPool(int max, final String name, BlockingQueue<Runnable> queue)
{
-    super(max, max, 4L, TimeUnit.SECONDS, queue, new NamingThreadFactory(name));
-    allowCoreThreadTimeOut(true);
+  public SimpleThreadPool(int coreAndMax, boolean allowCoreThreadTimeOut, final String name,
+      BlockingQueue<Runnable> queue) {
+    super(coreAndMax, coreAndMax, 4L, TimeUnit.SECONDS, queue, new NamingThreadFactory(name));
+    allowCoreThreadTimeOut(allowCoreThreadTimeOut);
   }
 
   /**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index a4a4a7b..33a3612 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -141,8 +141,8 @@ public class TServerUtils {
   public static ServerAddress startServer(MetricsSystem metricsSystem, ServerContext service,
       String hostname, Property portHintProperty, TProcessor processor, String serverName,
       String threadName, Property portSearchProperty, Property minThreadProperty,
-      Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
-      throws UnknownHostException {
+      Property allowCoreThreadTimeOutProperty, Property timeBetweenThreadChecksProperty,
+      Property maxMessageSizeProperty) throws UnknownHostException {
     final AccumuloConfiguration config = service.getConfiguration();
 
     final int[] portHint = config.getPort(portHintProperty);
@@ -152,6 +152,11 @@ public class TServerUtils {
       minThreads = config.getCount(minThreadProperty);
     }
 
+    boolean allowCoreThreadTimeOut = true;
+    if (allowCoreThreadTimeOutProperty != null) {
+      allowCoreThreadTimeOut = config.getBoolean(allowCoreThreadTimeOutProperty);
+    }
+
     long timeBetweenThreadChecks = 1000;
     if (timeBetweenThreadChecksProperty != null) {
       timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty);
@@ -184,9 +189,9 @@ public class TServerUtils {
     HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
     try {
       return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
-          minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize,
-          service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis(),
-          addresses);
+          minThreads, allowCoreThreadTimeOut, simpleTimerThreadpoolSize, timeBetweenThreadChecks,
+          maxMessageSize, service.getServerSslParams(), service.getSaslParams(),
+          service.getClientTimeoutInMillis(), addresses);
     } catch (TTransportException e) {
       if (portSearch) {
         // Build a list of reserved ports - as identified by properties of type PropertyType.PORT
@@ -209,9 +214,9 @@ public class TServerUtils {
           try {
             HostAndPort addr = HostAndPort.fromParts(hostname, port);
             return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
-                minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize,
-                service.getServerSslParams(), service.getSaslParams(),
-                service.getClientTimeoutInMillis(), addr);
+                minThreads, allowCoreThreadTimeOut, simpleTimerThreadpoolSize,
+                timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(),
+                service.getSaslParams(), service.getClientTimeoutInMillis(), addr);
           } catch (TTransportException tte) {
             log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
           }
@@ -231,8 +236,8 @@ public class TServerUtils {
    */
   public static ServerAddress createThreadedSelectorServer(HostAndPort address,
       TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
-      final int numThreads, final int numSTThreads, long timeBetweenThreadChecks,
-      long maxMessageSize) throws TTransportException {
+      final int numThreads, final boolean allowCoreThreadTimeOut, final int numSTThreads,
+      long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
 
     final TNonblockingServerSocket transport =
         new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()));
@@ -247,8 +252,8 @@ public class TServerUtils {
     options.stopTimeoutVal(5);
 
     // Create our own very special thread pool.
-    ThreadPoolExecutor pool =
-        createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
+        allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks);
 
     options.executorService(pool);
     options.processorFactory(new TProcessorFactory(processor));
@@ -266,8 +271,8 @@ public class TServerUtils {
    */
   public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor,
       TProtocolFactory protocolFactory, final String serverName, final int numThreads,
-      final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize)
-      throws TTransportException {
+      final boolean allowCoreThreadTimeOut, final int numSTThreads, long timeBetweenThreadChecks,
+      long maxMessageSize) throws TTransportException {
 
     final TNonblockingServerSocket transport =
         new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()));
@@ -279,8 +284,8 @@ public class TServerUtils {
     options.stopTimeoutVal(5);
 
     // Create our own very special thread pool.
-    ThreadPoolExecutor pool =
-        createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
+        allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks);
 
     options.executorService(pool);
     options.processorFactory(new TProcessorFactory(processor));
@@ -300,7 +305,11 @@ public class TServerUtils {
    * @param serverName
    *          A name to describe the thrift server this executor will service
    * @param executorThreads
-   *          The maximum number of threads for the executor
+   *          The minimum number of threads for the executor
+   * @param allowCoreThreadTimeOut
+   *          If false, then all threads are allowed to terminate effectively setting the
minimum to
+   *          0. Otherwise the core threads defined by executorThreads will always stay around
+   *          waiting for work.
    * @param simpleTimerThreads
    *          The numbers of threads used to get the {@link SimpleTimer} instance
    * @param timeBetweenThreadChecks
@@ -308,8 +317,10 @@ public class TServerUtils {
    * @return A {@link ThreadPoolExecutor} which will resize itself automatically
    */
   public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName,
-      final int executorThreads, int simpleTimerThreads, long timeBetweenThreadChecks) {
-    final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool");
+      final int executorThreads, boolean allowCoreThreadTimeOut, int simpleTimerThreads,
+      long timeBetweenThreadChecks) {
+    final ThreadPoolExecutor pool =
+        new SimpleThreadPool(executorThreads, allowCoreThreadTimeOut, "ClientPool");
     // periodically adjust the number of threads we need by checking how busy our threads
are
     SimpleTimer.getInstance(simpleTimerThreads).schedule(() -> {
       // there is a minor race condition between sampling the current state of the thread
pool and
@@ -347,13 +358,14 @@ public class TServerUtils {
    */
   public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor,
       TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads,
-      int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException
{
+      boolean allowCoreThreadTimeOut, int numSimpleTimerThreads, long timeBetweenThreadChecks)
+      throws TTransportException {
 
     InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort());
     // Must use an ISA, providing only a port would ignore the hostname given
     TServerSocket transport = new TServerSocket(isa);
     ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
-        numSimpleTimerThreads, timeBetweenThreadChecks);
+        allowCoreThreadTimeOut, numSimpleTimerThreads, timeBetweenThreadChecks);
     TThreadPoolServer server = createTThreadPoolServer(transport, processor,
         ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
 
@@ -457,8 +469,8 @@ public class TServerUtils {
    */
   public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor,
       TProtocolFactory protocolFactory, long socketTimeout, SslConnectionParams sslParams,
-      String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks)
-      throws TTransportException {
+      String serverName, int numThreads, boolean allowCoreThreadTimeOut, int numSimpleTimerThreads,
+      long timeBetweenThreadChecks) throws TTransportException {
     TServerSocket transport;
     try {
       transport = getSslServerSocket(address.getPort(), (int) socketTimeout,
@@ -474,7 +486,7 @@ public class TServerUtils {
     }
 
     ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
-        numSimpleTimerThreads, timeBetweenThreadChecks);
+        allowCoreThreadTimeOut, numSimpleTimerThreads, timeBetweenThreadChecks);
 
     return new ServerAddress(createTThreadPoolServer(transport, processor,
         ThriftUtil.transportFactory(), protocolFactory, pool), address);
@@ -482,8 +494,8 @@ public class TServerUtils {
 
   public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor
processor,
       TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params,
-      final String serverName, final int numThreads, final int numSTThreads,
-      long timeBetweenThreadChecks) throws TTransportException {
+      final String serverName, final int numThreads, final boolean allowCoreThreadTimeOut,
+      final int numSTThreads, long timeBetweenThreadChecks) throws TTransportException {
     // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that
the
     // TThreadPoolServer does,
     // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when
it
@@ -564,8 +576,8 @@ public class TServerUtils {
       log.info("SASL thrift server bound on {}", address);
     }
 
-    ThreadPoolExecutor pool =
-        createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
+        allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks);
 
     final TThreadPoolServer server =
         createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory,
pool);
@@ -575,9 +587,10 @@ public class TServerUtils {
 
   public static ServerAddress startTServer(MetricsSystem metricsSystem, AccumuloConfiguration
conf,
       ThriftServerType serverType, TProcessor processor, String serverName, String threadName,
-      int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,
-      SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
-      long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
+      int numThreads, boolean allowCoreThreadTimeOut, int numSTThreads,
+      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+      SaslServerConnectionParams saslParams, long serverSocketTimeout, HostAndPort... addresses)
+      throws TTransportException {
 
     if (serverType == ThriftServerType.SASL) {
       processor = updateSaslProcessor(serverType, processor);
@@ -585,22 +598,23 @@ public class TServerUtils {
 
     return startTServer(serverType,
         new TimedProcessor(metricsSystem, conf, processor, serverName, threadName), serverName,
-        threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams,
-        saslParams, serverSocketTimeout, addresses);
+        threadName, numThreads, allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks,
+        maxMessageSize, sslParams, saslParams, serverSocketTimeout, addresses);
   }
 
   /**
    * @see #startTServer(ThriftServerType, TimedProcessor, TProtocolFactory, String, String,
int,
-   *      int, long, long, SslConnectionParams, SaslServerConnectionParams, long, HostAndPort...)
+   *      boolean, int, long, long, SslConnectionParams, SaslServerConnectionParams, long,
+   *      HostAndPort...)
    */
   public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor,
-      String serverName, String threadName, int numThreads, int numSTThreads,
-      long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
-      SaslServerConnectionParams saslParams, long serverSocketTimeout, HostAndPort... addresses)
-      throws TTransportException {
+      String serverName, String threadName, int numThreads, boolean allowCoreThreadTimeOut,
+      int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,
+      SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
+      long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
     return startTServer(serverType, processor, ThriftUtil.protocolFactory(), serverName,
threadName,
-        numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams,
-        serverSocketTimeout, addresses);
+        numThreads, allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks, maxMessageSize,
+        sslParams, saslParams, serverSocketTimeout, addresses);
   }
 
   /**
@@ -612,8 +626,8 @@ public class TServerUtils {
    */
   public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor,
       TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads,
-      int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,
-      SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
+      boolean allowCoreThreadTimeOut, int numSTThreads, long timeBetweenThreadChecks,
+      long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
       long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
 
     // This is presently not supported. It's hypothetically possible, I believe, to work,
but it
@@ -629,30 +643,33 @@ public class TServerUtils {
         switch (serverType) {
           case SSL:
             log.debug("Instantiating SSL Thrift server");
-            serverAddress =
-                createSslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout,
-                    sslParams, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+            serverAddress = createSslThreadPoolServer(address, processor, protocolFactory,
+                serverSocketTimeout, sslParams, serverName, numThreads, allowCoreThreadTimeOut,
+                numSTThreads, timeBetweenThreadChecks);
             break;
           case SASL:
             log.debug("Instantiating SASL Thrift server");
-            serverAddress =
-                createSaslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout,
-                    saslParams, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+            serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory,
+                serverSocketTimeout, saslParams, serverName, numThreads, allowCoreThreadTimeOut,
+                numSTThreads, timeBetweenThreadChecks);
             break;
           case THREADPOOL:
             log.debug("Instantiating unsecure TThreadPool Thrift server");
             serverAddress = createBlockingServer(address, processor, protocolFactory,
-                maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+                maxMessageSize, serverName, numThreads, allowCoreThreadTimeOut, numSTThreads,
+                timeBetweenThreadChecks);
             break;
           case THREADED_SELECTOR:
             log.debug("Instantiating default, unsecure Threaded selector Thrift server");
             serverAddress = createThreadedSelectorServer(address, processor, protocolFactory,
-                serverName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+                serverName, numThreads, allowCoreThreadTimeOut, numSTThreads,
+                timeBetweenThreadChecks, maxMessageSize);
             break;
           case CUSTOM_HS_HA:
             log.debug("Instantiating unsecure custom half-async Thrift server");
-            serverAddress = createNonBlockingServer(address, processor, protocolFactory,
serverName,
-                numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+            serverAddress =
+                createNonBlockingServer(address, processor, protocolFactory, serverName,
numThreads,
+                    allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
             break;
           default:
             throw new IllegalArgumentException("Unknown server type " + serverType);
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index a7585ff..4429651 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -392,7 +392,8 @@ public class TServerUtilsTest {
 
     return TServerUtils.startServer(Metrics.initSystem(getClass().getSimpleName()), ctx,
hostname,
         Property.TSERV_CLIENTPORT, processor, "TServerUtilsTest", "TServerUtilsTestThread",
-        Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK,
+        Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS,
+        Property.TSERV_MINTHREADS_ALLOW_TIMEOUT, Property.TSERV_THREADCHECK,
         Property.GENERAL_MAX_MESSAGE_SIZE);
 
   }
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 65830df..edb0803 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -653,7 +653,7 @@ public class SimpleGarbageCollector extends AbstractServer implements
Iface {
     try {
       ServerAddress server = TServerUtils.startTServer(getMetricsSystem(), getConfiguration(),
           getContext().getThriftServerType(), processor, this.getClass().getSimpleName(),
-          "GC Monitor Service", 2,
+          "GC Monitor Service", 2, true,
           getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000,
           maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(),
0,
           addresses);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 212cda8..a76f9dd 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -1012,8 +1012,8 @@ public class Master extends AbstractServer
     try {
       sa = TServerUtils.startServer(getMetricsSystem(), context, getHostname(),
           Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler",
null,
-          Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
-          Property.GENERAL_MAX_MESSAGE_SIZE);
+          Property.MASTER_MINTHREADS, Property.MASTER_MINTHREADS_ALLOW_TIMEOUT,
+          Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
     } catch (UnknownHostException e) {
       throw new IllegalStateException("Unable to start server on host " + getHostname(),
e);
     }
@@ -1329,7 +1329,7 @@ public class Master extends AbstractServer
     ServerAddress replAddress = TServerUtils.startServer(getMetricsSystem(), context, getHostname(),
         Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor,
         "Master Replication Coordinator", "Replication Coordinator", null,
-        Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
+        Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, null,
         Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
 
     log.info("Started replication coordinator service at " + replAddress.address);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
index 9e5b736..850c837 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
@@ -206,7 +206,7 @@ public class BulkImport extends MasterRepo {
     @SuppressWarnings("deprecation")
     int workerCount = serverConfig.getCount(
         serverConfig.resolve(Property.MASTER_RENAME_THREADS, Property.MASTER_BULK_RENAME_THREADS));
-    SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulk move");
+    SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "bulk move");
     List<Future<Exception>> results = new ArrayList<>();
 
     for (FileStatus file : mapFiles) {
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
index ad1a155..0029779 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
@@ -93,8 +93,10 @@ class LoadFiles extends MasterRepo {
   private static synchronized ExecutorService getThreadPool(Master master) {
     if (threadPool == null) {
       int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
-      ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
-      pool.allowCoreThreadTimeOut(true);
+      boolean allowCoreThreadTimeOut =
+          master.getConfiguration().getBoolean(Property.MASTER_BULK_THREADPOOL_ALLOW_TIMEOUT);
+      ThreadPoolExecutor pool =
+          new SimpleThreadPool(threadPoolSize, allowCoreThreadTimeOut, "bulk import");
       threadPool = new TraceExecutorService(pool);
     }
     return threadPool;
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
index 1b63213..a5e5690 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
@@ -113,7 +113,7 @@ class BulkImportMove extends MasterRepo {
     @SuppressWarnings("deprecation")
     int workerCount = aConf.getCount(
         aConf.resolve(Property.MASTER_RENAME_THREADS, Property.MASTER_BULK_RENAME_THREADS));
-    SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulkDir move");
+    SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "bulkDir move");
     List<Future<Boolean>> results = new ArrayList<>();
 
     String fmtTid = FateTxId.formatTid(tid);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
index acea637..401f0ed 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
@@ -64,7 +64,7 @@ class MoveExportedFiles extends MasterRepo {
     String fmtTid = FateTxId.formatTid(tid);
 
     int workerCount = master.getConfiguration().getCount(Property.MASTER_RENAME_THREADS);
-    SimpleThreadPool workers = new SimpleThreadPool(workerCount, "importtable rename");
+    SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "importtable rename");
     List<Future<Boolean>> results = new ArrayList<>();
 
     VolumeManager fs = master.getVolumeManager();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 8b6047f..fc3c248 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -538,7 +538,8 @@ public class TabletServer extends AbstractServer {
     ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), address,
         Property.TSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
         "Thrift Client Server", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS,
-        Property.TSERV_THREADCHECK, maxMessageSizeProperty);
+        Property.TSERV_MINTHREADS_ALLOW_TIMEOUT, Property.TSERV_THREADCHECK,
+        maxMessageSizeProperty);
     this.server = sp.server;
     return sp.address;
   }
@@ -602,10 +603,11 @@ public class TabletServer extends AbstractServer {
     Property maxMessageSizeProperty =
         getConfiguration().get(Property.TSERV_MAX_MESSAGE_SIZE) != null
             ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE;
-    ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(),
-        clientAddress.getHost(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor,
-        "ReplicationServicerHandler", "Replication Servicer", Property.TSERV_PORTSEARCH,
-        Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
+    ServerAddress sp =
+        TServerUtils.startServer(getMetricsSystem(), getContext(), clientAddress.getHost(),
+            Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler",
+            "Replication Servicer", Property.TSERV_PORTSEARCH, Property.REPLICATION_MIN_THREADS,
+            null, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
     this.replServer = sp.server;
     log.info("Started replication service on {}", sp.address);
 
@@ -748,7 +750,7 @@ public class TabletServer extends AbstractServer {
     }
 
     ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(
-        getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
+        getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), true, "distributed work
queue");
 
     bulkFailedCopyQ = new DistributedWorkQueue(
         getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration());
@@ -891,7 +893,7 @@ public class TabletServer extends AbstractServer {
 
     // Start the pool to handle outgoing replications
     final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(
-        getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task");
+        getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), true, "replication
task");
     replWorker.setExecutor(replicationThreadPool);
     replWorker.run();
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index e0d52ce..0c6e749 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -223,7 +223,7 @@ public class LogSorter {
     this.fs = fs;
     this.conf = conf;
     int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
-    this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
+    this.threadPool = new SimpleThreadPool(threadPoolSize, true, this.getClass().getName());
     this.walBlockSize = DfsLogger.getWalBlockSize(conf);
   }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 8777d6f..8469bb7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -264,7 +264,7 @@ public class TabletServerLogger {
     if (nextLogMaker != null) {
       return;
     }
-    nextLogMaker = new SimpleThreadPool(1, "WALog creator");
+    nextLogMaker = new SimpleThreadPool(1, true, "WALog creator");
     nextLogMaker.submit(new LoggingRunnable(log, new Runnable() {
       @Override
       public void run() {
diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
index 9010deb..e02ebeb 100644
--- a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
@@ -76,7 +76,7 @@ public class BalanceWithOfflineTableIT extends ConfigurableMacBase {
 
       log.info("Waiting for balance");
 
-      SimpleThreadPool pool = new SimpleThreadPool(1, "waitForBalance");
+      SimpleThreadPool pool = new SimpleThreadPool(1, true, "waitForBalance");
       Future<Boolean> wait = pool.submit(() -> {
         c.instanceOperations().waitForBalance();
         return true;
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 8587efa..01a49b7 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -209,7 +209,7 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness {
         allMuts.add(muts);
       }
 
-      SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, "ClientThreads");
+      SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, true, "ClientThreads");
       threads.allowCoreThreadTimeOut(false);
       threads.prestartAllCoreThreads();
 
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 899d6b8..d50d1f1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -109,7 +109,7 @@ public class ZombieTServer {
     Processor<Iface> processor = new Processor<>(tch);
     ServerAddress serverPort = TServerUtils.startTServer(
         Metrics.initSystem(ZombieTServer.class.getSimpleName()), context.getConfiguration(),
-        ThriftServerType.CUSTOM_HS_HA, processor, "ZombieTServer", "walking dead", 2, 1,
1000,
+        ThriftServerType.CUSTOM_HS_HA, processor, "ZombieTServer", "walking dead", 2, true,
1, 1000,
         10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", port));
 
     String addressString = serverPort.address.toString();
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index 9ce7525..2e807e5 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -303,7 +303,7 @@ public class NullTserver {
     Processor<Iface> processor = new Processor<>(tch);
     TServerUtils.startTServer(Metrics.initSystem(NullTserver.class.getSimpleName()),
         context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, processor, "NullTServer",
-        "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1,
+        "null tserver", 2, true, 1, 1000, 10 * 1024 * 1024, null, null, -1,
         HostAndPort.fromParts("0.0.0.0", opts.port));
 
     HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);


Mime
View raw message