From commits-return-23916-archive-asf-public=cust-asf.ponee.io@accumulo.apache.org Tue Jul 7 14:57:18 2020 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by mx-eu-01.ponee.io (Postfix) with SMTP id 95A2A180675 for ; Tue, 7 Jul 2020 16:57:17 +0200 (CEST) Received: (qmail 23305 invoked by uid 500); 7 Jul 2020 14:57:16 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 23264 invoked by uid 99); 7 Jul 2020 14:57:16 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 Jul 2020 14:57:16 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 6879B890B9; Tue, 7 Jul 2020 14:57:16 +0000 (UTC) Date: Tue, 07 Jul 2020 14:57:17 +0000 To: "commits@accumulo.apache.org" Subject: [accumulo] 01/09: fixes #1621: The ClientPool thread pool allows all core threads to time out * Added properties to allow overridding the allowCoreThreadTimeout in various threadpools: master incoming requests, tserver incoming requests, master bulk imports MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit From: ibella@apache.org In-Reply-To: <159413383620.16080.14735180874112104032@gitbox.apache.org> References: <159413383620.16080.14735180874112104032@gitbox.apache.org> X-Git-Host: gitbox.apache.org X-Git-Repo: accumulo X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Rev: 352d2cd050411233148fd476531d93297b98d308 X-Git-NotificationType: diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated Message-Id: <20200707145716.6879B890B9@gitbox.apache.org> This is an automated email from the ASF dual-hosted git repository. ibella pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/accumulo.git commit 352d2cd050411233148fd476531d93297b98d308 Author: Ivan Bella AuthorDate: Thu Jun 4 18:17:27 2020 +0000 fixes #1621: The ClientPool thread pool allows all core threads to time out * Added properties to allow overridding the allowCoreThreadTimeout in various threadpools: master incoming requests, tserver incoming requests, master bulk imports --- .../core/clientImpl/TabletServerBatchReader.java | 2 +- .../core/clientImpl/TabletServerBatchWriter.java | 4 +- .../org/apache/accumulo/core/conf/Property.java | 9 ++ .../accumulo/core/util/SimpleThreadPool.java | 13 ++- .../apache/accumulo/server/rpc/TServerUtils.java | 121 ++++++++++++--------- .../accumulo/server/util/TServerUtilsTest.java | 3 +- .../apache/accumulo/gc/SimpleGarbageCollector.java | 2 +- .../java/org/apache/accumulo/master/Master.java | 6 +- .../master/tableOps/bulkVer1/BulkImport.java | 2 +- .../master/tableOps/bulkVer1/LoadFiles.java | 6 +- .../master/tableOps/bulkVer2/BulkImportMove.java | 2 +- .../tableOps/tableImport/MoveExportedFiles.java | 2 +- .../org/apache/accumulo/tserver/TabletServer.java | 16 +-- .../org/apache/accumulo/tserver/log/LogSorter.java | 2 +- .../accumulo/tserver/log/TabletServerLogger.java | 2 +- .../accumulo/test/BalanceWithOfflineTableIT.java | 2 +- .../test/functional/BatchWriterFlushIT.java | 2 +- .../accumulo/test/functional/ZombieTServer.java | 2 +- .../accumulo/test/performance/NullTserver.java | 2 +- 19 files changed, 116 insertions(+), 84 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java index 37909b8..d89b376 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java @@ -70,7 +70,7 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan this.numThreads = numQueryThreads; queryThreadPool = - new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-"); + new SimpleThreadPool(numQueryThreads, true, "batch scanner " + batchReaderInstance + "-"); cleanable = CleanerUtil.unclosed(this, scopeClass, closed, log, queryThreadPool.asCloseable()); } diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java index ff04bbf..1f02530 100644 --- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java +++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java @@ -647,9 +647,9 @@ public class TabletServerBatchWriter implements AutoCloseable { public MutationWriter(int numSendThreads) { serversMutations = new HashMap<>(); queued = new HashSet<>(); - sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName()); + sendThreadPool = new SimpleThreadPool(numSendThreads, true, this.getClass().getName()); locators = new HashMap<>(); - binningThreadPool = new SimpleThreadPool(1, "BinMutations", new SynchronousQueue<>()); + binningThreadPool = new SimpleThreadPool(1, true, "BinMutations", new SynchronousQueue<>()); binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); } diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index c93402e..585f7be 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -248,6 +248,9 @@ public enum Property { "The number of attempts to bulk import a RFile before giving up."), MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT, "The number of threads to use when coordinating a bulk import."), + MASTER_BULK_THREADPOOL_ALLOW_TIMEOUT("master.bulk.thread.timeout.allowed", "true", + PropertyType.BOOLEAN, + "True if the bulk import threads are allowed to timeout with no work available."), MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION, "The time to wait for a tablet server to process a bulk import request"), MASTER_RENAME_THREADS("master.rename.threadpool.size", "20", PropertyType.COUNT, @@ -261,6 +264,9 @@ public enum Property { "Regular expression that defines the set of Tablet Servers that will perform bulk imports"), MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), + MASTER_MINTHREADS_ALLOW_TIMEOUT("master.server.thread.timeout.allowed", "true", + PropertyType.BOOLEAN, + "True if the incoming request threads are allowed to timeout with no work available."), MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."), MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION, @@ -497,6 +503,9 @@ public enum Property { "The time to wait for a tablet server to process a bulk import request."), TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT, "The minimum number of threads to use to handle incoming requests."), + TSERV_MINTHREADS_ALLOW_TIMEOUT("tserver.server.thread.timeout.allowed", "true", + PropertyType.BOOLEAN, + "True if the incoming request threads are allowed to timeout with no work available."), TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the server thread pool."), TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", PropertyType.BYTES, diff --git a/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java b/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java index 1cc96ca..34f8359 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java +++ b/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java @@ -28,15 +28,16 @@ import java.util.concurrent.TimeUnit; */ public class SimpleThreadPool extends ThreadPoolExecutor { - public SimpleThreadPool(int max, final String name) { - super(max, max, 4L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), + public SimpleThreadPool(int coreAndMax, boolean allowCoreThreadTimeOut, final String name) { + super(coreAndMax, coreAndMax, 4L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), new NamingThreadFactory(name)); - allowCoreThreadTimeOut(true); + allowCoreThreadTimeOut(allowCoreThreadTimeOut); } - public SimpleThreadPool(int max, final String name, BlockingQueue queue) { - super(max, max, 4L, TimeUnit.SECONDS, queue, new NamingThreadFactory(name)); - allowCoreThreadTimeOut(true); + public SimpleThreadPool(int coreAndMax, boolean allowCoreThreadTimeOut, final String name, + BlockingQueue queue) { + super(coreAndMax, coreAndMax, 4L, TimeUnit.SECONDS, queue, new NamingThreadFactory(name)); + allowCoreThreadTimeOut(allowCoreThreadTimeOut); } /** diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java index a4a4a7b..33a3612 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java @@ -141,8 +141,8 @@ public class TServerUtils { public static ServerAddress startServer(MetricsSystem metricsSystem, ServerContext service, String hostname, Property portHintProperty, TProcessor processor, String serverName, String threadName, Property portSearchProperty, Property minThreadProperty, - Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty) - throws UnknownHostException { + Property allowCoreThreadTimeOutProperty, Property timeBetweenThreadChecksProperty, + Property maxMessageSizeProperty) throws UnknownHostException { final AccumuloConfiguration config = service.getConfiguration(); final int[] portHint = config.getPort(portHintProperty); @@ -152,6 +152,11 @@ public class TServerUtils { minThreads = config.getCount(minThreadProperty); } + boolean allowCoreThreadTimeOut = true; + if (allowCoreThreadTimeOutProperty != null) { + allowCoreThreadTimeOut = config.getBoolean(allowCoreThreadTimeOutProperty); + } + long timeBetweenThreadChecks = 1000; if (timeBetweenThreadChecksProperty != null) { timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty); @@ -184,9 +189,9 @@ public class TServerUtils { HostAndPort[] addresses = getHostAndPorts(hostname, portHint); try { return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName, - minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize, - service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis(), - addresses); + minThreads, allowCoreThreadTimeOut, simpleTimerThreadpoolSize, timeBetweenThreadChecks, + maxMessageSize, service.getServerSslParams(), service.getSaslParams(), + service.getClientTimeoutInMillis(), addresses); } catch (TTransportException e) { if (portSearch) { // Build a list of reserved ports - as identified by properties of type PropertyType.PORT @@ -209,9 +214,9 @@ public class TServerUtils { try { HostAndPort addr = HostAndPort.fromParts(hostname, port); return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName, - minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize, - service.getServerSslParams(), service.getSaslParams(), - service.getClientTimeoutInMillis(), addr); + minThreads, allowCoreThreadTimeOut, simpleTimerThreadpoolSize, + timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(), + service.getSaslParams(), service.getClientTimeoutInMillis(), addr); } catch (TTransportException tte) { log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName); } @@ -231,8 +236,8 @@ public class TServerUtils { */ public static ServerAddress createThreadedSelectorServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, final String serverName, - final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, - long maxMessageSize) throws TTransportException { + final int numThreads, final boolean allowCoreThreadTimeOut, final int numSTThreads, + long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { final TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort())); @@ -247,8 +252,8 @@ public class TServerUtils { options.stopTimeoutVal(5); // Create our own very special thread pool. - ThreadPoolExecutor pool = - createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks); + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, + allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks); options.executorService(pool); options.processorFactory(new TProcessorFactory(processor)); @@ -266,8 +271,8 @@ public class TServerUtils { */ public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, final String serverName, final int numThreads, - final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) - throws TTransportException { + final boolean allowCoreThreadTimeOut, final int numSTThreads, long timeBetweenThreadChecks, + long maxMessageSize) throws TTransportException { final TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort())); @@ -279,8 +284,8 @@ public class TServerUtils { options.stopTimeoutVal(5); // Create our own very special thread pool. - ThreadPoolExecutor pool = - createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks); + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, + allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks); options.executorService(pool); options.processorFactory(new TProcessorFactory(processor)); @@ -300,7 +305,11 @@ public class TServerUtils { * @param serverName * A name to describe the thrift server this executor will service * @param executorThreads - * The maximum number of threads for the executor + * The minimum number of threads for the executor + * @param allowCoreThreadTimeOut + * If false, then all threads are allowed to terminate effectively setting the minimum to + * 0. Otherwise the core threads defined by executorThreads will always stay around + * waiting for work. * @param simpleTimerThreads * The numbers of threads used to get the {@link SimpleTimer} instance * @param timeBetweenThreadChecks @@ -308,8 +317,10 @@ public class TServerUtils { * @return A {@link ThreadPoolExecutor} which will resize itself automatically */ public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, - final int executorThreads, int simpleTimerThreads, long timeBetweenThreadChecks) { - final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool"); + final int executorThreads, boolean allowCoreThreadTimeOut, int simpleTimerThreads, + long timeBetweenThreadChecks) { + final ThreadPoolExecutor pool = + new SimpleThreadPool(executorThreads, allowCoreThreadTimeOut, "ClientPool"); // periodically adjust the number of threads we need by checking how busy our threads are SimpleTimer.getInstance(simpleTimerThreads).schedule(() -> { // there is a minor race condition between sampling the current state of the thread pool and @@ -347,13 +358,14 @@ public class TServerUtils { */ public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads, - int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException { + boolean allowCoreThreadTimeOut, int numSimpleTimerThreads, long timeBetweenThreadChecks) + throws TTransportException { InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort()); // Must use an ISA, providing only a port would ignore the hostname given TServerSocket transport = new TServerSocket(isa); ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, - numSimpleTimerThreads, timeBetweenThreadChecks); + allowCoreThreadTimeOut, numSimpleTimerThreads, timeBetweenThreadChecks); TThreadPoolServer server = createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool); @@ -457,8 +469,8 @@ public class TServerUtils { */ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout, SslConnectionParams sslParams, - String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks) - throws TTransportException { + String serverName, int numThreads, boolean allowCoreThreadTimeOut, int numSimpleTimerThreads, + long timeBetweenThreadChecks) throws TTransportException { TServerSocket transport; try { transport = getSslServerSocket(address.getPort(), (int) socketTimeout, @@ -474,7 +486,7 @@ public class TServerUtils { } ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, - numSimpleTimerThreads, timeBetweenThreadChecks); + allowCoreThreadTimeOut, numSimpleTimerThreads, timeBetweenThreadChecks); return new ServerAddress(createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(), protocolFactory, pool), address); @@ -482,8 +494,8 @@ public class TServerUtils { public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params, - final String serverName, final int numThreads, final int numSTThreads, - long timeBetweenThreadChecks) throws TTransportException { + final String serverName, final int numThreads, final boolean allowCoreThreadTimeOut, + final int numSTThreads, long timeBetweenThreadChecks) throws TTransportException { // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the // TThreadPoolServer does, // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it @@ -564,8 +576,8 @@ public class TServerUtils { log.info("SASL thrift server bound on {}", address); } - ThreadPoolExecutor pool = - createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks); + ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, + allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks); final TThreadPoolServer server = createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool); @@ -575,9 +587,10 @@ public class TServerUtils { public static ServerAddress startTServer(MetricsSystem metricsSystem, AccumuloConfiguration conf, ThriftServerType serverType, TProcessor processor, String serverName, String threadName, - int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, - SslConnectionParams sslParams, SaslServerConnectionParams saslParams, - long serverSocketTimeout, HostAndPort... addresses) throws TTransportException { + int numThreads, boolean allowCoreThreadTimeOut, int numSTThreads, + long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, + SaslServerConnectionParams saslParams, long serverSocketTimeout, HostAndPort... addresses) + throws TTransportException { if (serverType == ThriftServerType.SASL) { processor = updateSaslProcessor(serverType, processor); @@ -585,22 +598,23 @@ public class TServerUtils { return startTServer(serverType, new TimedProcessor(metricsSystem, conf, processor, serverName, threadName), serverName, - threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams, - saslParams, serverSocketTimeout, addresses); + threadName, numThreads, allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks, + maxMessageSize, sslParams, saslParams, serverSocketTimeout, addresses); } /** * @see #startTServer(ThriftServerType, TimedProcessor, TProtocolFactory, String, String, int, - * int, long, long, SslConnectionParams, SaslServerConnectionParams, long, HostAndPort...) + * boolean, int, long, long, SslConnectionParams, SaslServerConnectionParams, long, + * HostAndPort...) */ public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor, - String serverName, String threadName, int numThreads, int numSTThreads, - long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, - SaslServerConnectionParams saslParams, long serverSocketTimeout, HostAndPort... addresses) - throws TTransportException { + String serverName, String threadName, int numThreads, boolean allowCoreThreadTimeOut, + int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, + SslConnectionParams sslParams, SaslServerConnectionParams saslParams, + long serverSocketTimeout, HostAndPort... addresses) throws TTransportException { return startTServer(serverType, processor, ThriftUtil.protocolFactory(), serverName, threadName, - numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams, - serverSocketTimeout, addresses); + numThreads, allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks, maxMessageSize, + sslParams, saslParams, serverSocketTimeout, addresses); } /** @@ -612,8 +626,8 @@ public class TServerUtils { */ public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor, TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads, - int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, - SslConnectionParams sslParams, SaslServerConnectionParams saslParams, + boolean allowCoreThreadTimeOut, int numSTThreads, long timeBetweenThreadChecks, + long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams, long serverSocketTimeout, HostAndPort... addresses) throws TTransportException { // This is presently not supported. It's hypothetically possible, I believe, to work, but it @@ -629,30 +643,33 @@ public class TServerUtils { switch (serverType) { case SSL: log.debug("Instantiating SSL Thrift server"); - serverAddress = - createSslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, - sslParams, serverName, numThreads, numSTThreads, timeBetweenThreadChecks); + serverAddress = createSslThreadPoolServer(address, processor, protocolFactory, + serverSocketTimeout, sslParams, serverName, numThreads, allowCoreThreadTimeOut, + numSTThreads, timeBetweenThreadChecks); break; case SASL: log.debug("Instantiating SASL Thrift server"); - serverAddress = - createSaslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout, - saslParams, serverName, numThreads, numSTThreads, timeBetweenThreadChecks); + serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory, + serverSocketTimeout, saslParams, serverName, numThreads, allowCoreThreadTimeOut, + numSTThreads, timeBetweenThreadChecks); break; case THREADPOOL: log.debug("Instantiating unsecure TThreadPool Thrift server"); serverAddress = createBlockingServer(address, processor, protocolFactory, - maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks); + maxMessageSize, serverName, numThreads, allowCoreThreadTimeOut, numSTThreads, + timeBetweenThreadChecks); break; case THREADED_SELECTOR: log.debug("Instantiating default, unsecure Threaded selector Thrift server"); serverAddress = createThreadedSelectorServer(address, processor, protocolFactory, - serverName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize); + serverName, numThreads, allowCoreThreadTimeOut, numSTThreads, + timeBetweenThreadChecks, maxMessageSize); break; case CUSTOM_HS_HA: log.debug("Instantiating unsecure custom half-async Thrift server"); - serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName, - numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize); + serverAddress = + createNonBlockingServer(address, processor, protocolFactory, serverName, numThreads, + allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks, maxMessageSize); break; default: throw new IllegalArgumentException("Unknown server type " + serverType); diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java index a7585ff..4429651 100644 --- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java +++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java @@ -392,7 +392,8 @@ public class TServerUtilsTest { return TServerUtils.startServer(Metrics.initSystem(getClass().getSimpleName()), ctx, hostname, Property.TSERV_CLIENTPORT, processor, "TServerUtilsTest", "TServerUtilsTestThread", - Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK, + Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, + Property.TSERV_MINTHREADS_ALLOW_TIMEOUT, Property.TSERV_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); } diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 65830df..edb0803 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -653,7 +653,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface { try { ServerAddress server = TServerUtils.startTServer(getMetricsSystem(), getConfiguration(), getContext().getThriftServerType(), processor, this.getClass().getSimpleName(), - "GC Monitor Service", 2, + "GC Monitor Service", 2, true, getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), 0, addresses); diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 212cda8..a76f9dd 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -1012,8 +1012,8 @@ public class Master extends AbstractServer try { sa = TServerUtils.startServer(getMetricsSystem(), context, getHostname(), Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null, - Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK, - Property.GENERAL_MAX_MESSAGE_SIZE); + Property.MASTER_MINTHREADS, Property.MASTER_MINTHREADS_ALLOW_TIMEOUT, + Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); } catch (UnknownHostException e) { throw new IllegalStateException("Unable to start server on host " + getHostname(), e); } @@ -1329,7 +1329,7 @@ public class Master extends AbstractServer ServerAddress replAddress = TServerUtils.startServer(getMetricsSystem(), context, getHostname(), Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor, "Master Replication Coordinator", "Replication Coordinator", null, - Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, + Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, null, Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE); log.info("Started replication coordinator service at " + replAddress.address); diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java index 9e5b736..850c837 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java @@ -206,7 +206,7 @@ public class BulkImport extends MasterRepo { @SuppressWarnings("deprecation") int workerCount = serverConfig.getCount( serverConfig.resolve(Property.MASTER_RENAME_THREADS, Property.MASTER_BULK_RENAME_THREADS)); - SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulk move"); + SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "bulk move"); List> results = new ArrayList<>(); for (FileStatus file : mapFiles) { diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java index ad1a155..0029779 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java @@ -93,8 +93,10 @@ class LoadFiles extends MasterRepo { private static synchronized ExecutorService getThreadPool(Master master) { if (threadPool == null) { int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE); - ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import"); - pool.allowCoreThreadTimeOut(true); + boolean allowCoreThreadTimeOut = + master.getConfiguration().getBoolean(Property.MASTER_BULK_THREADPOOL_ALLOW_TIMEOUT); + ThreadPoolExecutor pool = + new SimpleThreadPool(threadPoolSize, allowCoreThreadTimeOut, "bulk import"); threadPool = new TraceExecutorService(pool); } return threadPool; diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java index 1b63213..a5e5690 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java @@ -113,7 +113,7 @@ class BulkImportMove extends MasterRepo { @SuppressWarnings("deprecation") int workerCount = aConf.getCount( aConf.resolve(Property.MASTER_RENAME_THREADS, Property.MASTER_BULK_RENAME_THREADS)); - SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulkDir move"); + SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "bulkDir move"); List> results = new ArrayList<>(); String fmtTid = FateTxId.formatTid(tid); diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java index acea637..401f0ed 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java @@ -64,7 +64,7 @@ class MoveExportedFiles extends MasterRepo { String fmtTid = FateTxId.formatTid(tid); int workerCount = master.getConfiguration().getCount(Property.MASTER_RENAME_THREADS); - SimpleThreadPool workers = new SimpleThreadPool(workerCount, "importtable rename"); + SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "importtable rename"); List> results = new ArrayList<>(); VolumeManager fs = master.getVolumeManager(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 8b6047f..fc3c248 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -538,7 +538,8 @@ public class TabletServer extends AbstractServer { ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), address, Property.TSERV_CLIENTPORT, processor, this.getClass().getSimpleName(), "Thrift Client Server", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, - Property.TSERV_THREADCHECK, maxMessageSizeProperty); + Property.TSERV_MINTHREADS_ALLOW_TIMEOUT, Property.TSERV_THREADCHECK, + maxMessageSizeProperty); this.server = sp.server; return sp.address; } @@ -602,10 +603,11 @@ public class TabletServer extends AbstractServer { Property maxMessageSizeProperty = getConfiguration().get(Property.TSERV_MAX_MESSAGE_SIZE) != null ? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE; - ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), - clientAddress.getHost(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, - "ReplicationServicerHandler", "Replication Servicer", Property.TSERV_PORTSEARCH, - Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); + ServerAddress sp = + TServerUtils.startServer(getMetricsSystem(), getContext(), clientAddress.getHost(), + Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler", + "Replication Servicer", Property.TSERV_PORTSEARCH, Property.REPLICATION_MIN_THREADS, + null, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty); this.replServer = sp.server; log.info("Started replication service on {}", sp.address); @@ -748,7 +750,7 @@ public class TabletServer extends AbstractServer { } ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool( - getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue"); + getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), true, "distributed work queue"); bulkFailedCopyQ = new DistributedWorkQueue( getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration()); @@ -891,7 +893,7 @@ public class TabletServer extends AbstractServer { // Start the pool to handle outgoing replications final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool( - getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task"); + getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), true, "replication task"); replWorker.setExecutor(replicationThreadPool); replWorker.run(); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index e0d52ce..0c6e749 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -223,7 +223,7 @@ public class LogSorter { this.fs = fs; this.conf = conf; int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT); - this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName()); + this.threadPool = new SimpleThreadPool(threadPoolSize, true, this.getClass().getName()); this.walBlockSize = DfsLogger.getWalBlockSize(conf); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java index 8777d6f..8469bb7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java @@ -264,7 +264,7 @@ public class TabletServerLogger { if (nextLogMaker != null) { return; } - nextLogMaker = new SimpleThreadPool(1, "WALog creator"); + nextLogMaker = new SimpleThreadPool(1, true, "WALog creator"); nextLogMaker.submit(new LoggingRunnable(log, new Runnable() { @Override public void run() { diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java index 9010deb..e02ebeb 100644 --- a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java +++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java @@ -76,7 +76,7 @@ public class BalanceWithOfflineTableIT extends ConfigurableMacBase { log.info("Waiting for balance"); - SimpleThreadPool pool = new SimpleThreadPool(1, "waitForBalance"); + SimpleThreadPool pool = new SimpleThreadPool(1, true, "waitForBalance"); Future wait = pool.submit(() -> { c.instanceOperations().waitForBalance(); return true; diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java index 8587efa..01a49b7 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java @@ -209,7 +209,7 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness { allMuts.add(muts); } - SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, "ClientThreads"); + SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, true, "ClientThreads"); threads.allowCoreThreadTimeOut(false); threads.prestartAllCoreThreads(); diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index 899d6b8..d50d1f1 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -109,7 +109,7 @@ public class ZombieTServer { Processor processor = new Processor<>(tch); ServerAddress serverPort = TServerUtils.startTServer( Metrics.initSystem(ZombieTServer.class.getSimpleName()), context.getConfiguration(), - ThriftServerType.CUSTOM_HS_HA, processor, "ZombieTServer", "walking dead", 2, 1, 1000, + ThriftServerType.CUSTOM_HS_HA, processor, "ZombieTServer", "walking dead", 2, true, 1, 1000, 10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", port)); String addressString = serverPort.address.toString(); diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java index 9ce7525..2e807e5 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java @@ -303,7 +303,7 @@ public class NullTserver { Processor processor = new Processor<>(tch); TServerUtils.startTServer(Metrics.initSystem(NullTserver.class.getSimpleName()), context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, processor, "NullTServer", - "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1, + "null tserver", 2, true, 1, 1000, 10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", opts.port)); HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);