Return-Path: X-Original-To: apmail-accumulo-commits-archive@www.apache.org Delivered-To: apmail-accumulo-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C01F390EA for ; Tue, 1 May 2012 12:24:55 +0000 (UTC) Received: (qmail 52738 invoked by uid 500); 1 May 2012 12:24:55 -0000 Delivered-To: apmail-accumulo-commits-archive@accumulo.apache.org Received: (qmail 52704 invoked by uid 500); 1 May 2012 12:24:55 -0000 Mailing-List: contact commits-help@accumulo.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@accumulo.apache.org Delivered-To: mailing list commits@accumulo.apache.org Received: (qmail 52688 invoked by uid 99); 1 May 2012 12:24:55 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 May 2012 12:24:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 01 May 2012 12:24:51 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 7DD072388860 for ; Tue, 1 May 2012 12:24:29 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1332629 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/conf/Property.java server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java test/system/auto/simple/dynamicThreadPools.py Date: Tue, 01 May 2012 12:24:29 -0000 To: commits@accumulo.apache.org From: ecn@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120501122429.7DD072388860@eris.apache.org> Author: ecn Date: Tue May 1 12:24:28 2012 New Revision: 1332629 URL: http://svn.apache.org/viewvc?rev=1332629&view=rev Log: ACCUMULO-401 adjust the number of threads on configurable thread pools based on dynamic configuration changes (and add a test for it) Added: accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py (with props) Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1332629&r1=1332628&r2=1332629&view=diff ============================================================================== --- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original) +++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Tue May 1 12:24:28 2012 @@ -341,7 +341,7 @@ public enum Property { } private static final EnumSet fixedProperties = EnumSet.of(Property.TSERV_CLIENTPORT, Property.TSERV_NATIVEMAP_ENABLED, - Property.TSERV_MAJC_THREAD_MAXOPEN, Property.TSERV_SCAN_MAX_OPENFILES, Property.TSERV_MAJC_MAXCONCURRENT, Property.TSERV_LOGGER_COUNT, + Property.TSERV_SCAN_MAX_OPENFILES, Property.TSERV_LOGGER_COUNT, Property.LOGGER_PORT, Property.MASTER_CLIENTPORT, Property.GC_PORT); public static boolean isFixedZooPropertyKey(Property key) { Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1332629&r1=1332628&r2=1332629&view=diff ============================================================================== --- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java (original) +++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java Tue May 1 12:24:28 2012 @@ -22,7 +22,6 @@ import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.SortedMap; @@ -47,7 +46,6 @@ import org.apache.accumulo.core.file.blo import org.apache.accumulo.core.util.Daemon; import org.apache.accumulo.core.util.LoggingRunnable; import org.apache.accumulo.core.util.MetadataTable.DataFileValue; -import org.apache.accumulo.core.util.Pair; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.server.conf.ServerConfiguration; import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager; @@ -103,16 +101,41 @@ public class TabletServerResourceManager return tp; } + private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor tp) { + ExecutorService result = addEs(name, tp); + SimpleTimer.getInstance().schedule(new TimerTask() { + @Override + public void run() { + try { + int max = conf.getConfiguration().getCount(maxThreads); + if (tp.getMaximumPoolSize() != max) { + log.info("Changing " + maxThreads.getKey() + " to " + max); + tp.setCorePoolSize(max); + tp.setMaximumPoolSize(max); + } + } catch (Throwable t) { + log.error(t, t); + } + } + + }, 1000, 10 * 1000); + return result; + } + private ExecutorService createEs(int max, String name) { return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name))); } - private ExecutorService createEs(int max, String name, BlockingQueue queue) { - ThreadPoolExecutor tp = new ThreadPoolExecutor(max, max, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name)); - - return addEs(name, tp); + private ExecutorService createEs(Property max, String name) { + return createEs(max, name, new LinkedBlockingQueue()); } - + + private ExecutorService createEs(Property max, String name, BlockingQueue queue) { + int maxThreads = conf.getConfiguration().getCount(max); + ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS, queue, new NamingThreadFactory(name)); + return addEs(max, name, tp); + } + private ExecutorService createEs(int min, int max, int timeout, String name) { return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue(), new NamingThreadFactory(name))); } @@ -145,11 +168,11 @@ public class TabletServerResourceManager log.warn("In-memory map may not fit into local memory space."); } - minorCompactionThreadPool = createEs(acuConf.getCount(Property.TSERV_MINC_MAXCONCURRENT), "minor compactor"); + minorCompactionThreadPool = createEs(Property.TSERV_MINC_MAXCONCURRENT, "minor compactor"); // make this thread pool have a priority queue... and execute tablets with the most // files first! - majorCompactionThreadPool = createEs(acuConf.getCount(Property.TSERV_MAJC_MAXCONCURRENT), "major compactor", new CompactionQueue()); + majorCompactionThreadPool = createEs(Property.TSERV_MAJC_MAXCONCURRENT, "major compactor", new CompactionQueue()); rootMajorCompactionThreadPool = createEs(0, 1, 300, "md root major compactor"); defaultMajorCompactionThreadPool = createEs(0, 1, 300, "md major compactor"); @@ -157,7 +180,7 @@ public class TabletServerResourceManager defaultSplitThreadPool = createEs(0, 1, 60, "md splitter"); defaultMigrationPool = createEs(0, 1, 60, "metadata tablet migration"); - migrationPool = createEs(acuConf.getCount(Property.TSERV_MIGRATE_MAXCONCURRENT), "tablet migration"); + migrationPool = createEs(Property.TSERV_MIGRATE_MAXCONCURRENT, "tablet migration"); // not sure if concurrent assignments can run safely... even if they could there is probably no benefit at startup because // individual tablet servers are already running assignments concurrently... having each individual tablet server run @@ -166,8 +189,8 @@ public class TabletServerResourceManager assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment"); - readAheadThreadPool = createEs(acuConf.getCount(Property.TSERV_READ_AHEAD_MAXCONCURRENT), "tablet read ahead"); - defaultReadAheadThreadPool = createEs(acuConf.getCount(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT), "metadata tablets read ahead"); + readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read ahead"); + defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT, "metadata tablets read ahead"); tabletResources = new HashSet(); @@ -189,32 +212,6 @@ public class TabletServerResourceManager } memMgmt = new MemoryManagementFramework(); - - // do this last since "this" is leaking out to call back code below - SimpleTimer.getInstance().schedule(new TimerTask() { - @Override - public void run() { - try { - // periodically reset the configurable thread pool sizes - List> services = new ArrayList>(); - services.add(new Pair(minorCompactionThreadPool, Property.TSERV_MINC_MAXCONCURRENT)); - services.add(new Pair(majorCompactionThreadPool, Property.TSERV_MAJC_MAXCONCURRENT)); - services.add(new Pair(migrationPool, Property.TSERV_MIGRATE_MAXCONCURRENT)); - services.add(new Pair(readAheadThreadPool, Property.TSERV_READ_AHEAD_MAXCONCURRENT)); - services.add(new Pair(defaultReadAheadThreadPool, Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT)); - for (Pair pair : services) { - int count = acuConf.getCount(pair.getSecond()); - ThreadPoolExecutor tp = (ThreadPoolExecutor) pair.getFirst(); - if (tp.getMaximumPoolSize() != count) { - tp.setMaximumPoolSize(count); - } - } - } catch (Exception e) { - log.warn("Failed to change number of threads in pool " + e.getMessage(), e); - } - } - }, 1000, 10 * 1000); - } private static class TabletStateImpl implements TabletState, Cloneable { Added: accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py URL: http://svn.apache.org/viewvc/accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py?rev=1332629&view=auto ============================================================================== --- accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py (added) +++ accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py Tue May 1 12:24:28 2012 @@ -0,0 +1,74 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import logging +import unittest +import time +import sys + +from simple.readwrite import SunnyDayTest +from TestUtils import TestUtilsMixin + +log = logging.getLogger('test.auto') + +class DynamicThreadPools(SunnyDayTest): + 'Verify we can change thread pool sizes after the servers have run' + order = 50 + + settings = SunnyDayTest.settings.copy() + settings.update({ + 'tserver.compaction.major.delay': 1, + }) + + def setUp(self): + TestUtilsMixin.setUp(self); + + # initialize the database + self.createTable('test_ingest') + + # start test ingestion + log.info("Starting Test Ingester") + self.ingester = self.ingest(self.masterHost(), + self.options.rows*10, + size=self.options.size) + + def runTest(self): + self.waitForStop(self.ingester, self.waitTime()) + # make a bunch of work for compaction + self.shell(self.masterHost(), + 'clonetable test_ingest test_ingest1\n' + 'clonetable test_ingest test_ingest2\n' + 'clonetable test_ingest test_ingest3\n' + 'clonetable test_ingest test_ingest4\n' + 'clonetable test_ingest test_ingest5\n' + 'clonetable test_ingest test_ingest6\n' + 'config -s tserver.compaction.major.concurrent.max=1\n' + 'sleep 10\n' + 'compact -p .*\n' + 'sleep 2\n') + handle = self.runOn(self.masterHost(), [self.accumulo_sh(), 'org.apache.accumulo.server.test.GetMasterStats']) + out, err = self.waitForStop(handle, 120) + count = 0 + for line in out.split('\n'): + if line.find('Major Compacting') >= 0: + count += int(line.split()[2]) + self.assert_(count == 1) + +def suite(): + result = unittest.TestSuite() + result.addTest(DynamicThreadPools()) + return result Propchange: accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py ------------------------------------------------------------------------------ svn:eol-style = native Propchange: accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py ------------------------------------------------------------------------------ svn:executable = *