Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D86D918AF7 for ; Wed, 24 Feb 2016 23:05:55 +0000 (UTC) Received: (qmail 62360 invoked by uid 500); 24 Feb 2016 23:05:52 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 62293 invoked by uid 500); 24 Feb 2016 23:05:52 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 62284 invoked by uid 99); 24 Feb 2016 23:05:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Feb 2016 23:05:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 752E3E8EC9; Wed, 24 Feb 2016 23:05:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rkanter@apache.org To: common-commits@hadoop.apache.org Message-Id: <1d00790ee9b74fb58a72afd9e0e545e6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: YARN-4697. NM aggregation thread pool is not bound by limits (haibochen via rkanter) Date: Wed, 24 Feb 2016 23:05:52 +0000 (UTC) Repository: hadoop Updated Branches: refs/heads/branch-2 92e49cdd0 -> c2098d247 YARN-4697. NM aggregation thread pool is not bound by limits (haibochen via rkanter) (cherry picked from commit 954dd57043d2de4f962876c1b89753bfc7e4ce55) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c2098d24 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c2098d24 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c2098d24 Branch: refs/heads/branch-2 Commit: c2098d2470e6e0ad4fab3002a75f69f8f9b19c85 Parents: 92e49cd Author: Robert Kanter Authored: Wed Feb 24 15:00:24 2016 -0800 Committer: Robert Kanter Committed: Wed Feb 24 15:00:48 2016 -0800 ---------------------------------------------------------------------- hadoop-yarn-project/CHANGES.txt | 3 + .../hadoop/yarn/conf/YarnConfiguration.java | 5 + .../src/main/resources/yarn-default.xml | 8 ++ .../logaggregation/LogAggregationService.java | 35 ++++- .../TestLogAggregationService.java | 143 +++++++++++++++++++ 5 files changed, 188 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2098d24/hadoop-yarn-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4a53abb..89da408 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -82,6 +82,9 @@ Release 2.9.0 - UNRELEASED YARN-4648. Move preemption related tests from TestFairScheduler to TestFairSchedulerPreemption. (Kai Sasaki via ozawa) + YARN-4697. NM aggregation thread pool is not bound by + limits (haibochen via rkanter) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2098d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 3501586..ef44149 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -759,6 +759,11 @@ public class YarnConfiguration extends Configuration { public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs"; public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs"; + /** The number of threads to handle log aggregation in node manager. */ + public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE = + NM_PREFIX + "logaggregation.threadpool-size-max"; + public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100; + public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION = NM_PREFIX + "resourcemanager.minimum.version"; public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2098d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index cd4074a..72c89c9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1227,6 +1227,14 @@ + + Thread pool size for LogAggregationService in Node Manager. + + yarn.nodemanager.logaggregation.threadpool-size-max + 100 + + + Percentage of CPU that can be allocated for containers. This setting allows users to limit the amount of CPU that YARN containers use. Currently functional only http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2098d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java index 6411535..2d6b900 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java @@ -102,7 +102,8 @@ public class LogAggregationService extends AbstractService implements private final ConcurrentMap appLogAggregators; - private final ExecutorService threadPool; + @VisibleForTesting + ExecutorService threadPool; public LogAggregationService(Dispatcher dispatcher, Context context, DeletionService deletionService, LocalDirsHandlerService dirsHandler) { @@ -113,10 +114,6 @@ public class LogAggregationService extends AbstractService implements this.dirsHandler = dirsHandler; this.appLogAggregators = new ConcurrentHashMap(); - this.threadPool = HadoopExecutors.newCachedThreadPool( - new ThreadFactoryBuilder() - .setNameFormat("LogAggregationService #%d") - .build()); } protected void serviceInit(Configuration conf) throws Exception { @@ -126,7 +123,11 @@ public class LogAggregationService extends AbstractService implements this.remoteRootLogDirSuffix = conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX, YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX); - + int threadPoolSize = getAggregatorThreadPoolSize(conf); + this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize, + new ThreadFactoryBuilder() + .setNameFormat("LogAggregationService #%d") + .build()); super.serviceInit(conf); } @@ -487,4 +488,26 @@ public class LogAggregationService extends AbstractService implements public NodeId getNodeId() { return this.nodeId; } + + + private int getAggregatorThreadPoolSize(Configuration conf) { + int threadPoolSize; + try { + threadPoolSize = conf.getInt(YarnConfiguration + .NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE); + } catch (NumberFormatException ex) { + LOG.warn("Invalid thread pool size. Setting it to the default value " + + "in YarnConfiguration"); + threadPoolSize = YarnConfiguration. + DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE; + } + if(threadPoolSize <= 0) { + LOG.warn("Invalid thread pool size. Setting it to the default value " + + "in YarnConfiguration"); + threadPoolSize = YarnConfiguration. + DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE; + } + return threadPoolSize; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c2098d24/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java index 101fef0..87c3f27 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java @@ -55,6 +55,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.LogFactory; @@ -1040,6 +1046,143 @@ public class TestLogAggregationService extends BaseContainerManagerTest { return appAcls; } + @Test (timeout = 30000) + public void testFixedSizeThreadPool() throws Exception { + // store configured thread pool size temporarily for restoration + int initThreadPoolSize = conf.getInt(YarnConfiguration + .NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE); + + int threadPoolSize = 3; + conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + threadPoolSize); + + DeletionService delSrvc = mock(DeletionService.class); + + LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class); + when(dirSvc.getLogDirs()).thenThrow(new RuntimeException()); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + ExecutorService executorService = logAggregationService.threadPool; + + // used to block threads in the thread pool because main thread always + // acquires the write lock first. + final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + final Lock rLock = rwLock.readLock(); + final Lock wLock = rwLock.writeLock(); + + try { + wLock.lock(); + Runnable runnable = new Runnable() { + @Override + public void run() { + try { + // threads in the thread pool running this will be blocked + rLock.tryLock(35000, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + e.printStackTrace(); + } finally { + rLock.unlock(); + } + } + }; + + // submit $(threadPoolSize + 1) runnables to the thread pool. If the thread + // pool size is set properly, only $(threadPoolSize) threads will be + // created in the thread pool, each of which is blocked on the read lock. + for(int i = 0; i < threadPoolSize + 1; i++) { + executorService.submit(runnable); + } + + // count the number of current running LogAggregationService threads + int runningThread = ((ThreadPoolExecutor) executorService).getActiveCount(); + assertEquals(threadPoolSize, runningThread); + } + finally { + wLock.unlock(); + } + + logAggregationService.stop(); + logAggregationService.close(); + + // restore the original configurations to avoid side effects + conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + initThreadPoolSize); + } + + @Test + public void testInvalidThreadPoolSizeNaN() throws IOException { + testInvalidThreadPoolSizeValue("NaN"); + } + + @Test + public void testInvalidThreadPoolSizeNegative() throws IOException { + testInvalidThreadPoolSizeValue("-100"); + } + + @Test + public void testInvalidThreadPoolSizeXLarge() throws IOException { + testInvalidThreadPoolSizeValue("11111111111"); + } + + private void testInvalidThreadPoolSizeValue(final String threadPoolSize) + throws IOException { + Supplier isInputInvalid = new Supplier() { + @Override + public Boolean get() { + try { + int value = Integer.parseInt(threadPoolSize); + return value <= 0; + } catch (NumberFormatException ex) { + return true; + } + } + }; + + assertTrue("The thread pool size must be invalid to use with this " + + "method", isInputInvalid.get()); + + + // store configured thread pool size temporarily for restoration + int initThreadPoolSize = conf.getInt(YarnConfiguration + .NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE); + + conf.set(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + threadPoolSize); + + DeletionService delSrvc = mock(DeletionService.class); + + LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class); + when(dirSvc.getLogDirs()).thenThrow(new RuntimeException()); + + LogAggregationService logAggregationService = + new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc); + + logAggregationService.init(this.conf); + logAggregationService.start(); + + ThreadPoolExecutor executorService = (ThreadPoolExecutor) + logAggregationService.threadPool; + assertEquals("The thread pool size should be set to the value of YARN" + + ".DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE because the configured " + + " thread pool size is " + "invalid.", + YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + executorService.getMaximumPoolSize()); + + logAggregationService.stop(); + logAggregationService.close(); + + // retore original configuration to aviod side effects + conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE, + initThreadPoolSize); + } + @Test(timeout=20000) public void testStopAfterError() throws Exception { DeletionService delSrvc = mock(DeletionService.class);