Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CF35110D0E for ; Tue, 18 Mar 2014 13:49:32 +0000 (UTC) Received: (qmail 97600 invoked by uid 500); 18 Mar 2014 13:49:32 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 97501 invoked by uid 500); 18 Mar 2014 13:49:28 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 97449 invoked by uid 99); 18 Mar 2014 13:49:27 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Mar 2014 13:49:27 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 7BD5394728F; Tue, 18 Mar 2014 13:49:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: bhavanki@apache.org To: commits@accumulo.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: git commit: ACCUMULO-2419 Reimplement SimpleTimer using executors Date: Tue, 18 Mar 2014 13:49:26 +0000 (UTC) Repository: accumulo Updated Branches: refs/heads/master 18fee706b -> 3bd0caa58 ACCUMULO-2419 Reimplement SimpleTimer using executors The SimpleTimer class is changed to use Java's executor service for thread pools, rather than java.util.Timer. This allows Accumulo to run SimpleTimer threads using more than one thread, for systems that can handle the load. This change also transitions a wide variety of SimpleTimer users to use the new form of the SimpleTimer.getInstance() method. Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/3bd0caa5 Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/3bd0caa5 Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/3bd0caa5 Branch: refs/heads/master Commit: 3bd0caa5877e16fd3ff98863040dc6eb67fdeef2 Parents: 18fee70 Author: Bill Havanki Authored: Mon Mar 3 13:51:37 2014 -0500 Committer: Bill Havanki Committed: Tue Mar 18 09:49:01 2014 -0400 ---------------------------------------------------------------------- .../org/apache/accumulo/core/conf/Property.java | 2 + .../impl/MiniAccumuloClusterImpl.java | 2 +- .../org/apache/accumulo/server/Accumulo.java | 7 +- .../accumulo/server/master/LiveTServerSet.java | 2 +- .../accumulo/server/util/TServerUtils.java | 17 +-- .../accumulo/server/util/time/SimpleTimer.java | 124 ++++++++++++++----- .../server/zookeeper/DistributedWorkQueue.java | 7 +- .../server/util/time/SimpleTimerTest.java | 99 +++++++++++++++ .../accumulo/gc/SimpleGarbageCollector.java | 4 +- .../java/org/apache/accumulo/master/Master.java | 4 +- .../master/recovery/RecoveryManager.java | 14 ++- .../accumulo/master/tableOps/BulkImport.java | 2 +- .../accumulo/monitor/servlets/BasicServlet.java | 2 +- .../org/apache/accumulo/tracer/TraceServer.java | 2 +- .../accumulo/tserver/CompactionWatcher.java | 2 +- .../apache/accumulo/tserver/FileManager.java | 2 +- .../apache/accumulo/tserver/TabletServer.java | 24 ++-- .../tserver/TabletServerResourceManager.java | 2 +- .../apache/accumulo/tserver/log/LogSorter.java | 2 +- .../accumulo/test/functional/ZombieTServer.java | 2 +- .../test/performance/thrift/NullTserver.java | 2 +- 21 files changed, 250 insertions(+), 74 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/core/src/main/java/org/apache/accumulo/core/conf/Property.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index efa7eb5..953ca8e 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -156,6 +156,8 @@ public enum Property { GENERAL_KERBEROS_PRINCIPAL("general.kerberos.principal", "", PropertyType.STRING, "Name of the kerberos principal to use. _HOST will automatically be " + "replaced by the machines hostname in the hostname portion of the principal. Leave blank if not using kerberoized hdfs"), GENERAL_MAX_MESSAGE_SIZE("general.server.message.size.max", "1G", PropertyType.MEMORY, "The maximum size of a message that can be sent to a server."), + GENERAL_SIMPLETIMER_THREADPOOL_SIZE("general.server.simpletimer.threadpool.size", "1", PropertyType.COUNT, "The number of threads to use for " + + "server-internal scheduled tasks"), @Experimental GENERAL_VOLUME_CHOOSER("general.volume.chooser", "org.apache.accumulo.server.fs.RandomVolumeChooser", PropertyType.CLASSNAME, "The class that will be used to select which volume will be used to create new files."), http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java ---------------------------------------------------------------------- diff --git a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java index 35a7b9d..2252b1a 100644 --- a/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java +++ b/minicluster/src/main/java/org/apache/accumulo/minicluster/impl/MiniAccumuloClusterImpl.java @@ -93,7 +93,7 @@ public class MiniAccumuloClusterImpl { this.in = new BufferedReader(new InputStreamReader(stream)); out = new BufferedWriter(new FileWriter(logFile)); - SimpleTimer.getInstance().schedule(new Runnable() { + SimpleTimer.getInstance(null).schedule(new Runnable() { @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java index 2fa9051..21e9955 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java +++ b/server/base/src/main/java/org/apache/accumulo/server/Accumulo.java @@ -26,6 +26,7 @@ import java.util.Map.Entry; import java.util.TreeMap; import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.trace.DistributedTrace; import org.apache.accumulo.core.util.AddressUtil; @@ -140,14 +141,14 @@ public class Accumulo { log.info(key + " = " + (Property.isSensitive(key) ? "" : entry.getValue())); } - monitorSwappiness(); + monitorSwappiness(config.getConfiguration()); } /** * */ - public static void monitorSwappiness() { - SimpleTimer.getInstance().schedule(new Runnable() { + public static void monitorSwappiness(AccumuloConfiguration config) { + SimpleTimer.getInstance(config).schedule(new Runnable() { @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java index 63bd894..f0fea77 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java +++ b/server/base/src/main/java/org/apache/accumulo/server/master/LiveTServerSet.java @@ -226,7 +226,7 @@ public class LiveTServerSet implements Watcher { public synchronized void startListeningForTabletServerChanges() { scanServers(); - SimpleTimer.getInstance().schedule(new Runnable() { + SimpleTimer.getInstance(conf).schedule(new Runnable() { @Override public void run() { scanServers(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java index 6d9e4c7..dc243c2 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/TServerUtils.java @@ -119,7 +119,8 @@ public class TServerUtils { port = 1024 + port % (65535 - 1024); try { HostAndPort addr = HostAndPort.fromParts(address, port); - return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, timeBetweenThreadChecks, maxMessageSize, + return TServerUtils.startTServer(addr, timedProcessor, serverName, threadName, minThreads, + conf.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), timeBetweenThreadChecks, maxMessageSize, SslConnectionParams.forServer(conf), conf.getTimeInMillis(Property.GENERAL_RPC_TIMEOUT)); } catch (TTransportException ex) { log.error("Unable to start TServer", ex); @@ -234,7 +235,7 @@ public class TServerUtils { } public static ServerAddress createHsHaServer(HostAndPort address, TProcessor processor, final String serverName, String threadName, final int numThreads, - long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { + final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException { TNonblockingServerSocket transport = new TNonblockingServerSocket(new InetSocketAddress(address.getHostText(), address.getPort())); THsHaServer.Args options = new THsHaServer.Args(transport); options.protocolFactory(ThriftUtil.protocolFactory()); @@ -246,7 +247,7 @@ public class TServerUtils { */ final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool"); // periodically adjust the number of threads we need by checking how busy our threads are - SimpleTimer.getInstance().schedule(new Runnable() { + SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() { @Override public void run() { if (pool.getCorePoolSize() <= pool.getActiveCount()) { @@ -316,20 +317,20 @@ public class TServerUtils { return new ServerAddress(createThreadPoolServer(transport, processor), address); } - public static ServerAddress startTServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, + public static ServerAddress startTServer(HostAndPort address, TProcessor processor, String serverName, String threadName, int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException { - return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, timeBetweenThreadChecks, - maxMessageSize, sslParams, sslSocketTimeout); + return startTServer(address, new TimedProcessor(processor, serverName, threadName), serverName, threadName, numThreads, numSTThreads, + timeBetweenThreadChecks, maxMessageSize, sslParams, sslSocketTimeout); } public static ServerAddress startTServer(HostAndPort address, TimedProcessor processor, String serverName, String threadName, int numThreads, - long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException { + int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams, long sslSocketTimeout) throws TTransportException { ServerAddress serverAddress; if (sslParams != null) { serverAddress = createSslThreadPoolServer(address, processor, sslSocketTimeout, sslParams); } else { - serverAddress = createHsHaServer(address, processor, serverName, threadName, numThreads, timeBetweenThreadChecks, maxMessageSize); + serverAddress = createHsHaServer(address, processor, serverName, threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize); } final TServer finalServer = serverAddress.server; Runnable serveTask = new Runnable() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java b/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java index 499b0de..b8f7f6d 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java +++ b/server/base/src/main/java/org/apache/accumulo/server/util/time/SimpleTimer.java @@ -16,56 +16,120 @@ */ package org.apache.accumulo.server.util.time; -import java.util.Timer; -import java.util.TimerTask; - +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import org.apache.accumulo.core.conf.AccumuloConfiguration; +import org.apache.accumulo.core.conf.Property; import org.apache.log4j.Logger; /** - * Generic singleton timer: don't use it if you are going to do anything that will take very long. Please use it to reduce the number of threads dedicated to + * Generic singleton timer. Don't use this if you are going to do anything that will take very long. Please use it to reduce the number of threads dedicated to * simple events. * */ public class SimpleTimer { + private static final Logger log = Logger.getLogger(SimpleTimer.class); - static class LoggingTimerTask extends TimerTask { - - private Runnable task; - - LoggingTimerTask(Runnable task) { - this.task = task; - } - - @Override - public void run() { - try { - task.run(); - } catch (Throwable t) { - Logger.getLogger(LoggingTimerTask.class).warn("Timer task failed " + task.getClass().getName() + " " + t.getMessage(), t); - } + private static class ExceptionHandler implements Thread.UncaughtExceptionHandler { + public void uncaughtException(Thread t, Throwable e) { + log.warn("SimpleTimer task failed", e); } - } + private static int instanceThreadPoolSize = -1; private static SimpleTimer instance; - private Timer timer; + private ScheduledExecutorService executor; + private static final int DEFAULT_THREAD_POOL_SIZE = 1; + /** + * Gets the timer instance. + * + * @deprecated Use {@link #getInstance(AccumuloConfiguration)} instead to + * get the configured number of threads. + */ + @Deprecated public static synchronized SimpleTimer getInstance() { - if (instance == null) - instance = new SimpleTimer(); + return getInstance(null); + } + /** + * Gets the timer instance. If an instance has already been created, it will + * have the number of threads supplied when it was constructed, and the size + * provided here is ignored. + * + * @param threadPoolSize number of threads + */ + public static synchronized SimpleTimer getInstance(int threadPoolSize) { + if (instance == null) { + instance = new SimpleTimer(threadPoolSize); + instance.instanceThreadPoolSize = threadPoolSize; + } else { + if (instance.instanceThreadPoolSize != threadPoolSize) { + log.warn("Asked to create SimpleTimer with thread pool size " + + threadPoolSize + ", existing instance has " + + instanceThreadPoolSize); + } + } return instance; } - - private SimpleTimer() { - timer = new Timer("SimpleTimer", true); + /** + * Gets the timer instance. If an instance has already been created, it will + * have the number of threads supplied when it was constructed, and the size + * provided by the configuration here is ignored. If a null configuration is + * supplied, the number of threads defaults to 1. + * + * @param conf configuration from which to get the number of threads + * @see Property#GENERAL_SIMPLETIMER_THREADPOOL_SIZE + */ + public static synchronized SimpleTimer getInstance(AccumuloConfiguration conf) { + int threadPoolSize; + if (conf != null) { + threadPoolSize = conf.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE); + } else { + threadPoolSize = DEFAULT_THREAD_POOL_SIZE; + } + return getInstance(threadPoolSize); + } + + /** + * Gets the thread pool size for the timer instance. Use for testing only. + * + * @return thread pool size for timer instance, or -1 if not yet constructed + */ + @VisibleForTesting + static int getInstanceThreadPoolSize() { + return instanceThreadPoolSize; } - public void schedule(Runnable task, long delay) { - timer.schedule(new LoggingTimerTask(task), delay); + private SimpleTimer(int threadPoolSize) { + executor = Executors.newScheduledThreadPool(threadPoolSize, new ThreadFactoryBuilder().setNameFormat("SimpleTimer-%d").setDaemon(true) + .setUncaughtExceptionHandler(new ExceptionHandler()).build()); } - public void schedule(Runnable task, long delay, long period) { - timer.schedule(new LoggingTimerTask(task), delay, period); + /** + * Schedules a task to run in the future. + * + * @param task task to run + * @param delay number of milliseconds to wait before execution + * @return future for scheduled task + */ + public ScheduledFuture schedule(Runnable task, long delay) { + return executor.schedule(task, delay, TimeUnit.MILLISECONDS); } + /** + * Schedules a task to run in the future with a fixed delay between repeated + * executions. + * + * @param task task to run + * @param delay number of milliseconds to wait before first execution + * @param period number of milliseconds to wait between executions + * @return future for scheduled task + */ + public ScheduledFuture schedule(Runnable task, long delay, long period) { + return executor.scheduleWithFixedDelay(task, delay, period, TimeUnit.MILLISECONDS); + } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java ---------------------------------------------------------------------- diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java index c5a9528..4738c2b 100644 --- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java +++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java @@ -24,6 +24,7 @@ import java.util.Set; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy; import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy; import org.apache.accumulo.server.util.time.SimpleTimer; @@ -47,6 +48,7 @@ public class DistributedWorkQueue { private ThreadPoolExecutor threadPool; private ZooReaderWriter zoo = ZooReaderWriter.getInstance(); private String path; + private AccumuloConfiguration config; private AtomicInteger numTask = new AtomicInteger(0); @@ -147,8 +149,9 @@ public class DistributedWorkQueue { void process(String workID, byte[] data); } - public DistributedWorkQueue(String path) { + public DistributedWorkQueue(String path, AccumuloConfiguration config) { this.path = path; + this.config = config; } public void startProcessing(final Processor processor, ThreadPoolExecutor executorService) throws KeeperException, InterruptedException { @@ -189,7 +192,7 @@ public class DistributedWorkQueue { Random r = new Random(); // Add a little jitter to avoid all the tservers slamming zookeeper at once - SimpleTimer.getInstance().schedule(new Runnable() { + SimpleTimer.getInstance(config).schedule(new Runnable() { @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/base/src/test/java/org/apache/accumulo/server/util/time/SimpleTimerTest.java ---------------------------------------------------------------------- diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/time/SimpleTimerTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/time/SimpleTimerTest.java new file mode 100644 index 0000000..0a59812 --- /dev/null +++ b/server/base/src/test/java/org/apache/accumulo/server/util/time/SimpleTimerTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.accumulo.server.util.time; + +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.*; + +public class SimpleTimerTest { + private static final long DELAY = 1000L; + private static final long PERIOD = 2000L; + private static final long PAD = 100L; + private SimpleTimer t; + + @Before + public void setUp() { + t = SimpleTimer.getInstance(null); + } + + private static class Incrementer implements Runnable { + private final AtomicInteger i; + private volatile boolean canceled; + + Incrementer(AtomicInteger i) { + this.i = i; + canceled = false; + } + + public void run() { + if (canceled) { + return; + } + i.incrementAndGet(); + } + + public void cancel() { + canceled = true; + } + } + + private static class Thrower implements Runnable { + boolean wasRun = false; + public void run() { + wasRun = true; + throw new IllegalStateException("You shall not pass"); + } + } + + @Test + public void testOneTimeSchedule() throws InterruptedException { + AtomicInteger i = new AtomicInteger(); + Incrementer r = new Incrementer(i); + t.schedule(r, DELAY); + Thread.sleep(DELAY + PAD); + assertEquals(1, i.get()); + r.cancel(); + } + + @Test + public void testFixedDelaySchedule() throws InterruptedException { + AtomicInteger i = new AtomicInteger(); + Incrementer r = new Incrementer(i); + t.schedule(r, DELAY, PERIOD); + Thread.sleep(DELAY + (2 * PERIOD) + PAD); + assertEquals(3, i.get()); + r.cancel(); + } + + @Test + public void testFailure() throws InterruptedException { + Thrower r = new Thrower(); + t.schedule(r, DELAY); + Thread.sleep(DELAY + PAD); + assertTrue(r.wasRun); + } + + @Test + public void testSingleton() { + assertEquals(1, SimpleTimer.getInstanceThreadPoolSize()); + SimpleTimer t2 = SimpleTimer.getInstance(2); + assertSame(t, t2); + assertEquals(1, SimpleTimer.getInstanceThreadPoolSize()); // unchanged + } +} http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java ---------------------------------------------------------------------- diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java index 89925b4..95b12fe 100644 --- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java +++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java @@ -555,8 +555,8 @@ public class SimpleGarbageCollector implements Iface { HostAndPort result = HostAndPort.fromParts(opts.getAddress(), port); log.debug("Starting garbage collector listening on " + result); try { - return TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, 1000, maxMessageSize, - SslConnectionParams.forServer(conf), 0).address; + return TServerUtils.startTServer(result, processor, this.getClass().getSimpleName(), "GC Monitor Service", 2, + conf.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000, maxMessageSize, SslConnectionParams.forServer(conf), 0).address; } catch (Exception ex) { log.fatal(ex, ex); throw new RuntimeException(ex); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/master/src/main/java/org/apache/accumulo/master/Master.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java index 8e98c04..4b4141b 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/Master.java +++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java @@ -207,7 +207,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt if (newState == MasterState.STOP) { // Give the server a little time before shutdown so the client // thread requesting the stop can return - SimpleTimer.getInstance().schedule(new Runnable() { + SimpleTimer.getInstance(serverConfig.getConfiguration()).schedule(new Runnable() { @Override public void run() { // This frees the main thread and will cause the master to exit @@ -901,7 +901,7 @@ public class Master implements LiveTServerSet.Listener, TableObserver, CurrentSt fate = new Fate(this, store); fate.startTransactionRunners(threads); - SimpleTimer.getInstance().schedule(new Runnable() { + SimpleTimer.getInstance(serverConfig.getConfiguration()).schedule(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java index 76d3520..a9adf8f 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java +++ b/server/master/src/main/java/org/apache/accumulo/master/recovery/RecoveryManager.java @@ -63,7 +63,8 @@ public class RecoveryManager { executor = Executors.newScheduledThreadPool(4, new NamingThreadFactory("Walog sort starter ")); zooCache = new ZooCache(); try { - List workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).getWorkQueued(); + AccumuloConfiguration aconf = master.getConfiguration().getConfiguration(); + List workIDs = new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY, aconf).getWorkQueued(); sortsQueued.addAll(workIDs); } catch (Exception e) { log.warn(e, e); @@ -87,14 +88,14 @@ public class RecoveryManager { public void run() { boolean rescheduled = false; try { - - long time = closer.close(master.getConfiguration().getConfiguration(), master.getFileSystem(), new Path(source)); + AccumuloConfiguration aconf = master.getConfiguration().getConfiguration(); + long time = closer.close(aconf, master.getFileSystem(), new Path(source)); if (time > 0) { executor.schedule(this, time, TimeUnit.MILLISECONDS); rescheduled = true; } else { - initiateSort(sortId, source, destination); + initiateSort(sortId, source, destination, aconf); } } catch (FileNotFoundException e) { log.debug("Unable to initate log sort for " + source + ": " + e); @@ -111,9 +112,10 @@ public class RecoveryManager { } - private void initiateSort(String sortId, String source, final String destination) throws KeeperException, InterruptedException, IOException { + private void initiateSort(String sortId, String source, final String destination, AccumuloConfiguration aconf) + throws KeeperException, InterruptedException, IOException { String work = source + "|" + destination; - new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY).addWork(sortId, work.getBytes(Constants.UTF8)); + new DistributedWorkQueue(ZooUtil.getRoot(master.getInstance()) + Constants.ZRECOVERY, aconf).addWork(sortId, work.getBytes(Constants.UTF8)); synchronized (this) { sortsQueued.add(sortId); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java ---------------------------------------------------------------------- diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java index bdc89dd..82abf22 100644 --- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java +++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/BulkImport.java @@ -432,7 +432,7 @@ class CopyFailed extends MasterRepo { if (loadedFailures.size() > 0) { DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(Constants.ZROOT + "/" + HdfsZooInstance.getInstance().getInstanceID() - + Constants.ZBULK_FAILED_COPYQ); + + Constants.ZBULK_FAILED_COPYQ, master.getConfiguration().getConfiguration()); HashSet workIds = new HashSet(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java ---------------------------------------------------------------------- diff --git a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java index bf65dae..3cbc0d3 100644 --- a/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java +++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/servlets/BasicServlet.java @@ -105,7 +105,7 @@ abstract public class BasicServlet extends HttpServlet { synchronized (BasicServlet.class) { // Learn our instance name asynchronously so we don't hang up if zookeeper is down if (cachedInstanceName == null) { - SimpleTimer.getInstance().schedule(new TimerTask() { + SimpleTimer.getInstance(Monitor.getSystemConfiguration()).schedule(new TimerTask() { @Override public void run() { synchronized (BasicServlet.class) { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java ---------------------------------------------------------------------- diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java index 30f1ae7..7aab5cd 100644 --- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java +++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java @@ -227,7 +227,7 @@ public class TraceServer implements Watcher { } public void run() throws Exception { - SimpleTimer.getInstance().schedule(new Runnable() { + SimpleTimer.getInstance(serverConfiguration.getConfiguration()).schedule(new Runnable() { @Override public void run() { flush(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java index e6ca38f..2e4d7b7 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/CompactionWatcher.java @@ -102,7 +102,7 @@ public class CompactionWatcher implements Runnable { public static synchronized void startWatching(AccumuloConfiguration config) { if (!watching) { - SimpleTimer.getInstance().schedule(new CompactionWatcher(config), 10000, 10000); + SimpleTimer.getInstance(config).schedule(new CompactionWatcher(config), 10000, 10000); watching = true; } } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java index bb95532..6b553a1 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/FileManager.java @@ -177,7 +177,7 @@ public class FileManager { this.reservedReaders = new HashMap(); this.maxIdleTime = conf.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE); - SimpleTimer.getInstance().schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2); + SimpleTimer.getInstance(conf.getConfiguration()).schedule(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2); } http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java index 3d58a99..a718748 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java @@ -261,8 +261,9 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu this.serverConfig = conf; this.instance = conf.getInstance(); this.fs = fs; - this.logSorter = new LogSorter(instance, fs, getSystemConfiguration()); - SimpleTimer.getInstance().schedule(new Runnable() { + AccumuloConfiguration aconf = getSystemConfiguration(); + this.logSorter = new LogSorter(instance, fs, aconf); + SimpleTimer.getInstance(aconf).schedule(new Runnable() { @Override public void run() { synchronized (onlineTablets) { @@ -358,6 +359,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu SecureRandom random; Map sessions; long maxIdle; + AccumuloConfiguration aconf; SessionManager(AccumuloConfiguration conf) { random = new SecureRandom(); @@ -372,7 +374,8 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } }; - SimpleTimer.getInstance().schedule(r, 0, Math.max(maxIdle / 2, 1000)); + SimpleTimer.getInstance(conf).schedule(r, 0, Math.max(maxIdle / 2, 1000)); + aconf = conf; } synchronized long createSession(Session session, boolean reserve) { @@ -515,7 +518,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } }; - SimpleTimer.getInstance().schedule(r, delay); + SimpleTimer.getInstance(aconf).schedule(r, delay); } } @@ -2958,7 +2961,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu enqueueMasterMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent)); long reschedule = Math.min((1l << Math.min(32, retryAttempt)) * 1000, 10 * 60 * 1000l); log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.)); - SimpleTimer.getInstance().schedule(new TimerTask() { + SimpleTimer.getInstance(getSystemConfiguration()).schedule(new TimerTask() { @Override public void run() { log.info("adding tablet " + extent + " back to the assignment pool (retry " + retryAttempt + ")"); @@ -3167,7 +3170,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(getSystemConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue"); - bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ); + bulkFailedCopyQ = new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZBULK_FAILED_COPYQ, getSystemConfiguration()); try { bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(), distWorkQThreadPool); } catch (Exception e1) { @@ -3516,9 +3519,10 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } }; - SimpleTimer.getInstance().schedule(contextCleaner, 60000, 60000); + AccumuloConfiguration aconf = getSystemConfiguration(); + SimpleTimer.getInstance(aconf).schedule(contextCleaner, 60000, 60000); - FileSystemMonitor.start(getSystemConfiguration(), Property.TSERV_MONITOR_FS); + FileSystemMonitor.start(aconf, Property.TSERV_MONITOR_FS); Runnable gcDebugTask = new Runnable() { @Override @@ -3527,7 +3531,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } }; - SimpleTimer.getInstance().schedule(gcDebugTask, 0, 1000); + SimpleTimer.getInstance(aconf).schedule(gcDebugTask, 0, 1000); Runnable constraintTask = new Runnable() { @@ -3545,7 +3549,7 @@ public class TabletServer extends AbstractMetricsImpl implements org.apache.accu } }; - SimpleTimer.getInstance().schedule(constraintTask, 0, 1000); + SimpleTimer.getInstance(aconf).schedule(constraintTask, 0, 1000); this.resourceManager = new TabletServerResourceManager(instance, fs); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index 4eae109..f26c74b 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -108,7 +108,7 @@ public class TabletServerResourceManager { private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor tp) { ExecutorService result = addEs(name, tp); - SimpleTimer.getInstance().schedule(new Runnable() { + SimpleTimer.getInstance(conf.getConfiguration()).schedule(new Runnable() { @Override public void run() { try { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java ---------------------------------------------------------------------- diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java index 8f783c3..1e3f895 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java @@ -213,7 +213,7 @@ public class LogSorter { public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool) throws KeeperException, InterruptedException { this.threadPool = distWorkQThreadPool; - new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY).startProcessing(new LogProcessor(), this.threadPool); + new DistributedWorkQueue(ZooUtil.getRoot(instance) + Constants.ZRECOVERY, conf).startProcessing(new LogProcessor(), this.threadPool); } public List getLogSorts() { http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java index f26c8d7..9b3eec7 100644 --- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java +++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java @@ -98,7 +98,7 @@ public class ZombieTServer { TransactionWatcher watcher = new TransactionWatcher(); final ThriftClientHandler tch = new ThriftClientHandler(instance, watcher); Processor processor = new Processor(tch); - ServerAddress serverPort = TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", "walking dead", 2, 1000, + ServerAddress serverPort = TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", port), processor, "ZombieTServer", "walking dead", 2, 1, 1000, 10 * 1024 * 1024, null, -1); String addressString = serverPort.address.toString(); http://git-wip-us.apache.org/repos/asf/accumulo/blob/3bd0caa5/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java ---------------------------------------------------------------------- diff --git a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java index 0591b19..6c34172 100644 --- a/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java +++ b/test/src/main/java/org/apache/accumulo/test/performance/thrift/NullTserver.java @@ -243,7 +243,7 @@ public class NullTserver { TransactionWatcher watcher = new TransactionWatcher(); ThriftClientHandler tch = new ThriftClientHandler(HdfsZooInstance.getInstance(), watcher); Processor processor = new Processor(tch); - TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", opts.port), processor, "NullTServer", "null tserver", 2, 1000, 10 * 1024 * 1024, null, -1); + TServerUtils.startTServer(HostAndPort.fromParts("0.0.0.0", opts.port), processor, "NullTServer", "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, -1); HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);