accumulo-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From e..@apache.org
Subject svn commit: r1332629 - in /accumulo/trunk: core/src/main/java/org/apache/accumulo/core/conf/Property.java server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java test/system/auto/simple/dynamicThreadPools.py
Date Tue, 01 May 2012 12:24:29 GMT
Author: ecn
Date: Tue May  1 12:24:28 2012
New Revision: 1332629

URL: http://svn.apache.org/viewvc?rev=1332629&view=rev
Log:
ACCUMULO-401 adjust the number of threads on configurable thread pools based on dynamic configuration
changes (and add a test for it)

Added:
    accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py   (with props)
Modified:
    accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
    accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java

Modified: accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java?rev=1332629&r1=1332628&r2=1332629&view=diff
==============================================================================
--- accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java (original)
+++ accumulo/trunk/core/src/main/java/org/apache/accumulo/core/conf/Property.java Tue May
 1 12:24:28 2012
@@ -341,7 +341,7 @@ public enum Property {
   }
   
   private static final EnumSet<Property> fixedProperties = EnumSet.of(Property.TSERV_CLIENTPORT,
Property.TSERV_NATIVEMAP_ENABLED,
-      Property.TSERV_MAJC_THREAD_MAXOPEN, Property.TSERV_SCAN_MAX_OPENFILES, Property.TSERV_MAJC_MAXCONCURRENT,
Property.TSERV_LOGGER_COUNT,
+      Property.TSERV_SCAN_MAX_OPENFILES, Property.TSERV_LOGGER_COUNT,
       Property.LOGGER_PORT, Property.MASTER_CLIENTPORT, Property.GC_PORT);
   
   public static boolean isFixedZooPropertyKey(Property key) {

Modified: accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
URL: http://svn.apache.org/viewvc/accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java?rev=1332629&r1=1332628&r2=1332629&view=diff
==============================================================================
--- accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
(original)
+++ accumulo/trunk/server/src/main/java/org/apache/accumulo/server/tabletserver/TabletServerResourceManager.java
Tue May  1 12:24:28 2012
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.SortedMap;
@@ -47,7 +46,6 @@ import org.apache.accumulo.core.file.blo
 import org.apache.accumulo.core.util.Daemon;
 import org.apache.accumulo.core.util.LoggingRunnable;
 import org.apache.accumulo.core.util.MetadataTable.DataFileValue;
-import org.apache.accumulo.core.util.Pair;
 import org.apache.accumulo.core.util.UtilWaitThread;
 import org.apache.accumulo.server.conf.ServerConfiguration;
 import org.apache.accumulo.server.tabletserver.FileManager.ScanFileManager;
@@ -103,16 +101,41 @@ public class TabletServerResourceManager
     return tp;
   }
   
+  private ExecutorService addEs(final Property maxThreads, String name, final ThreadPoolExecutor
tp) {
+    ExecutorService result = addEs(name, tp);
+    SimpleTimer.getInstance().schedule(new TimerTask() {
+      @Override
+      public void run() {
+        try {
+          int max = conf.getConfiguration().getCount(maxThreads);
+          if (tp.getMaximumPoolSize() != max) {
+            log.info("Changing " + maxThreads.getKey() + " to " + max);
+            tp.setCorePoolSize(max);
+            tp.setMaximumPoolSize(max);
+          }
+        } catch (Throwable t) {
+          log.error(t, t);
+        }
+      }
+      
+    }, 1000, 10 * 1000);
+    return result;
+  }
+
   private ExecutorService createEs(int max, String name) {
     return addEs(name, Executors.newFixedThreadPool(max, new NamingThreadFactory(name)));
   }
   
-  private ExecutorService createEs(int max, String name, BlockingQueue<Runnable> queue)
{
-    ThreadPoolExecutor tp = new ThreadPoolExecutor(max, max, 0L, TimeUnit.MILLISECONDS, queue,
new NamingThreadFactory(name));
-    
-    return addEs(name, tp);
+  private ExecutorService createEs(Property max, String name) {
+    return createEs(max, name, new LinkedBlockingQueue<Runnable>());
   }
-  
+
+  private ExecutorService createEs(Property max, String name, BlockingQueue<Runnable>
queue) {
+    int maxThreads = conf.getConfiguration().getCount(max);
+    ThreadPoolExecutor tp = new ThreadPoolExecutor(maxThreads, maxThreads, 0L, TimeUnit.MILLISECONDS,
queue, new NamingThreadFactory(name));
+    return addEs(max, name, tp);
+  }
+
   private ExecutorService createEs(int min, int max, int timeout, String name) {
     return addEs(name, new ThreadPoolExecutor(min, max, timeout, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
new NamingThreadFactory(name)));
   }
@@ -145,11 +168,11 @@ public class TabletServerResourceManager
       log.warn("In-memory map may not fit into local memory space.");
     }
     
-    minorCompactionThreadPool = createEs(acuConf.getCount(Property.TSERV_MINC_MAXCONCURRENT),
"minor compactor");
+    minorCompactionThreadPool = createEs(Property.TSERV_MINC_MAXCONCURRENT, "minor compactor");
     
     // make this thread pool have a priority queue... and execute tablets with the most
     // files first!
-    majorCompactionThreadPool = createEs(acuConf.getCount(Property.TSERV_MAJC_MAXCONCURRENT),
"major compactor", new CompactionQueue());
+    majorCompactionThreadPool = createEs(Property.TSERV_MAJC_MAXCONCURRENT, "major compactor",
new CompactionQueue());
     rootMajorCompactionThreadPool = createEs(0, 1, 300, "md root major compactor");
     defaultMajorCompactionThreadPool = createEs(0, 1, 300, "md major compactor");
     
@@ -157,7 +180,7 @@ public class TabletServerResourceManager
     defaultSplitThreadPool = createEs(0, 1, 60, "md splitter");
     
     defaultMigrationPool = createEs(0, 1, 60, "metadata tablet migration");
-    migrationPool = createEs(acuConf.getCount(Property.TSERV_MIGRATE_MAXCONCURRENT), "tablet
migration");
+    migrationPool = createEs(Property.TSERV_MIGRATE_MAXCONCURRENT, "tablet migration");
     
     // not sure if concurrent assignments can run safely... even if they could there is probably
no benefit at startup because
     // individual tablet servers are already running assignments concurrently... having each
individual tablet server run
@@ -166,8 +189,8 @@ public class TabletServerResourceManager
     
     assignMetaDataPool = createEs(0, 1, 60, "metadata tablet assignment");
     
-    readAheadThreadPool = createEs(acuConf.getCount(Property.TSERV_READ_AHEAD_MAXCONCURRENT),
"tablet read ahead");
-    defaultReadAheadThreadPool = createEs(acuConf.getCount(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT),
"metadata tablets read ahead");
+    readAheadThreadPool = createEs(Property.TSERV_READ_AHEAD_MAXCONCURRENT, "tablet read
ahead");
+    defaultReadAheadThreadPool = createEs(Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT,
"metadata tablets read ahead");
     
     tabletResources = new HashSet<TabletResourceManager>();
     
@@ -189,32 +212,6 @@ public class TabletServerResourceManager
     }
     
     memMgmt = new MemoryManagementFramework();
-    
-    // do this last since "this" is leaking out to call back code below
-    SimpleTimer.getInstance().schedule(new TimerTask() {
-      @Override
-      public void run() {
-        try {
-          // periodically reset the configurable thread pool sizes
-          List<Pair<ExecutorService,Property>> services = new ArrayList<Pair<ExecutorService,Property>>();
-          services.add(new Pair<ExecutorService,Property>(minorCompactionThreadPool,
Property.TSERV_MINC_MAXCONCURRENT));
-          services.add(new Pair<ExecutorService,Property>(majorCompactionThreadPool,
Property.TSERV_MAJC_MAXCONCURRENT));
-          services.add(new Pair<ExecutorService,Property>(migrationPool, Property.TSERV_MIGRATE_MAXCONCURRENT));
-          services.add(new Pair<ExecutorService,Property>(readAheadThreadPool, Property.TSERV_READ_AHEAD_MAXCONCURRENT));
-          services.add(new Pair<ExecutorService,Property>(defaultReadAheadThreadPool,
Property.TSERV_METADATA_READ_AHEAD_MAXCONCURRENT));
-          for (Pair<ExecutorService,Property> pair : services) {
-            int count = acuConf.getCount(pair.getSecond());
-            ThreadPoolExecutor tp = (ThreadPoolExecutor) pair.getFirst();
-            if (tp.getMaximumPoolSize() != count) {
-              tp.setMaximumPoolSize(count);
-            }
-          }
-        } catch (Exception e) {
-          log.warn("Failed to change number of threads in pool " + e.getMessage(), e);
-        }
-      }
-    }, 1000, 10 * 1000);
-    
   }
   
   private static class TabletStateImpl implements TabletState, Cloneable {

Added: accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py
URL: http://svn.apache.org/viewvc/accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py?rev=1332629&view=auto
==============================================================================
--- accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py (added)
+++ accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py Tue May  1 12:24:28 2012
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import logging
+import unittest
+import time
+import sys
+
+from simple.readwrite import SunnyDayTest
+from TestUtils import TestUtilsMixin
+
+log = logging.getLogger('test.auto')
+
+class DynamicThreadPools(SunnyDayTest):
+    'Verify we can change thread pool sizes after the servers have run'
+    order = 50
+
+    settings = SunnyDayTest.settings.copy()
+    settings.update({
+        'tserver.compaction.major.delay': 1,
+        })
+
+    def setUp(self):
+        TestUtilsMixin.setUp(self);
+
+        # initialize the database
+        self.createTable('test_ingest')
+
+        # start test ingestion
+        log.info("Starting Test Ingester")
+        self.ingester = self.ingest(self.masterHost(),
+                                    self.options.rows*10,
+                                    size=self.options.size)
+
+    def runTest(self):
+        self.waitForStop(self.ingester, self.waitTime())
+        # make a bunch of work for compaction
+        self.shell(self.masterHost(), 
+		   'clonetable test_ingest test_ingest1\n'
+		   'clonetable test_ingest test_ingest2\n'
+ 		   'clonetable test_ingest test_ingest3\n'
+ 		   'clonetable test_ingest test_ingest4\n'
+ 		   'clonetable test_ingest test_ingest5\n'
+ 		   'clonetable test_ingest test_ingest6\n'
+                   'config -s tserver.compaction.major.concurrent.max=1\n'
+		   'sleep 10\n'
+		   'compact -p .*\n'
+		   'sleep 2\n')
+	handle = self.runOn(self.masterHost(), [self.accumulo_sh(), 'org.apache.accumulo.server.test.GetMasterStats'])
+        out, err = self.waitForStop(handle, 120)
+        count = 0
+        for line in out.split('\n'):
+	   if line.find('Major Compacting') >= 0:
+		count += int(line.split()[2])
+        self.assert_(count == 1)
+
+def suite():
+    result = unittest.TestSuite()
+    result.addTest(DynamicThreadPools())
+    return result

Propchange: accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: accumulo/trunk/test/system/auto/simple/dynamicThreadPools.py
------------------------------------------------------------------------------
    svn:executable = *



Mime
View raw message