Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 8270018E43 for ; Thu, 18 Feb 2016 08:49:31 +0000 (UTC) Received: (qmail 8146 invoked by uid 500); 18 Feb 2016 08:49:31 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 8081 invoked by uid 500); 18 Feb 2016 08:49:31 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 8068 invoked by uid 99); 18 Feb 2016 08:49:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Feb 2016 08:49:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BF6C1DFF67; Thu, 18 Feb 2016 08:49:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vvasudev@apache.org To: common-commits@hadoop.apache.org Date: Thu, 18 Feb 2016 08:49:30 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] hadoop git commit: MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana. Repository: hadoop Updated Branches: refs/heads/branch-2 d4203c9aa -> 212c519ad refs/heads/trunk c1afac3a9 -> 2440671a1 MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2440671a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2440671a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2440671a Branch: refs/heads/trunk Commit: 2440671a117f165dcda5056404bc898df3c50803 Parents: c1afac3 Author: Varun Vasudev Authored: Thu Feb 18 14:15:08 2016 +0530 Committer: Varun Vasudev Committed: Thu Feb 18 14:18:36 2016 +0530 ---------------------------------------------------------------------- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapred/LocalContainerLauncher.java | 4 +-- .../v2/app/commit/CommitterEventHandler.java | 3 +- .../mapreduce/v2/app/job/impl/JobImpl.java | 3 +- .../v2/app/launcher/ContainerLauncherImpl.java | 3 +- .../mapred/LocalDistributedCacheManager.java | 5 ++-- .../apache/hadoop/mapred/LocalJobRunner.java | 8 +++-- .../hadoop/mapred/LocatedFileStatusFetcher.java | 4 +-- .../java/org/apache/hadoop/mapred/TaskLog.java | 31 ++++++++++---------- .../mapred/lib/MultithreadedMapRunner.java | 4 ++- .../lib/output/TestFileOutputCommitter.java | 4 +-- .../mapreduce/v2/hs/HistoryFileManager.java | 6 ++-- .../hadoop/mapreduce/v2/hs/JobHistory.java | 3 +- .../apache/hadoop/mapred/ShuffleHandler.java | 6 ++-- .../org/apache/hadoop/examples/pi/Util.java | 5 ++-- 15 files changed, 53 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 0b8c818..da28bc6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -310,6 +310,9 @@ Release 2.9.0 - UNRELEASED MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter) + MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in + mapreduce. (Sidharta Seethana via vvasudev) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java index 1a0d5fb..da118c5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java @@ -26,7 +26,6 @@ import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; @@ -60,6 +59,7 @@ import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -138,7 +138,7 @@ public class LocalContainerLauncher extends AbstractService implements // make it a daemon thread so that the process can exit even if the task is // not interruptible taskRunner = - Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). + HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder(). setDaemon(true).setNameFormat("uber-SubtaskRunner").build()); // create and start an event handling thread eventHandler = new Thread(new EventHandler(), "uber-EventHandler"); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java index b53955f..0b1be70 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java @@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; @@ -133,7 +134,7 @@ public class CommitterEventHandler extends AbstractService tfBuilder.setThreadFactory(backingTf); } ThreadFactory tf = tfBuilder.build(); - launcherPool = new ThreadPoolExecutor(5, 5, 1, + launcherPool = new HadoopThreadPoolExecutor(5, 5, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); eventHandlingThread = new Thread(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index 5ed0762..c8c5ce9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -116,6 +116,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; @@ -698,7 +699,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, .setNameFormat("Job Fail Wait Timeout Monitor #%d") .setDaemon(true) .build(); - this.executor = new ScheduledThreadPoolExecutor(1, threadFactory); + this.executor = new HadoopScheduledThreadPoolExecutor(1, threadFactory); // This "this leak" is okay because the retained pointer is in an // instance variable. http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java index a7e966c..189e2ef 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java @@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse; @@ -266,7 +267,7 @@ public class ContainerLauncherImpl extends AbstractService implements "ContainerLauncher #%d").setDaemon(true).build(); // Start with a default core-pool size of 10 and change it dynamically. - launcherPool = new ThreadPoolExecutor(initialPoolSize, + launcherPool = new HadoopThreadPoolExecutor(initialPoolSize, Integer.MAX_VALUE, 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java index 8606ede..3b87197 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java @@ -35,7 +35,6 @@ import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicLong; @@ -43,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileContext; -import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.LocalDirAllocator; import org.apache.hadoop.fs.Path; @@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.util.ConverterUtils; @@ -121,7 +120,7 @@ class LocalDistributedCacheManager { ThreadFactory tf = new ThreadFactoryBuilder() .setNameFormat("LocalDistributedCacheManager Downloader #%d") .build(); - exec = Executors.newCachedThreadPool(tf); + exec = HadoopExecutors.newCachedThreadPool(tf); Path destPath = localDirAllocator.getLocalPathForWrite(".", conf); Map> resourcesToPaths = Maps.newHashMap(); for (LocalResource resource : localResources.values()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java index 45d3cc5..37c147d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java @@ -32,7 +32,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -74,6 +73,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.concurrent.HadoopExecutors; /** Implements MapReduce locally, in-process, for debugging. */ @InterfaceAudience.Private @@ -428,7 +428,8 @@ public class LocalJobRunner implements ClientProtocol { ThreadFactory tf = new ThreadFactoryBuilder() .setNameFormat("LocalJobRunner Map Task Executor #%d") .build(); - ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf); + ExecutorService executor = HadoopExecutors.newFixedThreadPool( + maxMapThreads, tf); return executor; } @@ -454,7 +455,8 @@ public class LocalJobRunner implements ClientProtocol { LOG.debug("Reduce tasks to process: " + this.numReduceTasks); // Create a new executor service to drain the work queue. - ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads); + ExecutorService executor = HadoopExecutors.newFixedThreadPool( + maxReduceThreads); return executor; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java index 87114ad..a039bc9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java @@ -24,7 +24,6 @@ import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; @@ -47,6 +46,7 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.concurrent.HadoopExecutors; /** * Utility class to fetch block locations for specified Input paths using a @@ -92,7 +92,7 @@ public class LocatedFileStatusFetcher { IOException { int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS, FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS); - rawExec = Executors.newFixedThreadPool( + rawExec = HadoopExecutors.newFixedThreadPool( numThreads, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("GetFileInfo #%d").build()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java index e07b5be..bf838c2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java @@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.util.ProcessTree; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.log4j.Appender; import org.apache.log4j.LogManager; @@ -327,22 +328,22 @@ public class TaskLog { public static ScheduledExecutorService createLogSyncer() { final ScheduledExecutorService scheduler = - Executors.newSingleThreadScheduledExecutor( - new ThreadFactory() { - @Override - public Thread newThread(Runnable r) { - final Thread t = Executors.defaultThreadFactory().newThread(r); - t.setDaemon(true); - t.setName("Thread for syncLogs"); - return t; - } - }); + HadoopExecutors.newSingleThreadScheduledExecutor( + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + t.setName("Thread for syncLogs"); + return t; + } + }); ShutdownHookManager.get().addShutdownHook(new Runnable() { - @Override - public void run() { - TaskLog.syncLogsShutdown(scheduler); - } - }, 50); + @Override + public void run() { + TaskLog.syncLogsShutdown(scheduler); + } + }, 50); scheduler.scheduleWithFixedDelay( new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java index 98d794b..05339bc 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java @@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.SkipBadRecords; import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import java.io.IOException; import java.util.concurrent.*; @@ -84,7 +85,8 @@ public class MultithreadedMapRunner // Creating a threadpool of the configured size to execute the Mapper // map method in parallel. - executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, + executorService = new HadoopThreadPoolExecutor(numberOfThreads, + numberOfThreads, 0L, TimeUnit.MILLISECONDS, new BlockingArrayQueue (numberOfThreads)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index eba513b..20d8ab5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -25,10 +25,10 @@ import java.io.IOException; import java.net.URI; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import junit.framework.TestCase; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -696,7 +696,7 @@ public class TestFileOutputCommitter extends TestCase { }; } - final ExecutorService executor = Executors.newFixedThreadPool(2); + final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2); try { for (int i = 0; i < taCtx.length; i++) { final int taskIdx = i; http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java index 6be0d27..677d5c2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java @@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.util.ShutdownThreadsHelper; +import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import com.google.common.annotations.VisibleForTesting; @@ -554,8 +555,9 @@ public class HistoryFileManager extends AbstractService { JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT); ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat( "MoveIntermediateToDone Thread #%d").build(); - moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads, - 1, TimeUnit.HOURS, new LinkedBlockingQueue(), tf); + moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads, + numMoveThreads, 1, TimeUnit.HOURS, + new LinkedBlockingQueue(), tf); super.serviceInit(conf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java index 41bc90a..45075c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java @@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.Service; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.event.EventHandler; @@ -126,7 +127,7 @@ public class JobHistory extends AbstractService implements HistoryContext { ((Service) storage).start(); } - scheduledExecutor = new ScheduledThreadPoolExecutor(2, + scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2, new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d") .build()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java index 2fb7811..0d6e900 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java @@ -46,7 +46,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executors; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -81,6 +80,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.ssl.SSLFactory; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Shell; +import org.apache.hadoop.util.concurrent.HadoopExecutors; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; @@ -475,8 +475,8 @@ public class ShuffleHandler extends AuxiliaryService { .build(); selector = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(bossFactory), - Executors.newCachedThreadPool(workerFactory), + HadoopExecutors.newCachedThreadPool(bossFactory), + HadoopExecutors.newCachedThreadPool(workerFactory), maxShuffleThreads); super.serviceInit(new Configuration(conf)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java index 8afc1bd..e74c091 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java @@ -35,7 +35,6 @@ import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; @@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.util.ToolRunner; import com.google.common.base.Charsets; +import org.apache.hadoop.util.concurrent.HadoopExecutors; /** Utility methods */ public class Util { @@ -157,7 +157,8 @@ public class Util { /** Execute the callables by a number of threads */ public static > void execute(int nThreads, List callables ) throws InterruptedException, ExecutionException { - final ExecutorService executor = Executors.newFixedThreadPool(nThreads); + final ExecutorService executor = HadoopExecutors.newFixedThreadPool( + nThreads); final List> futures = executor.invokeAll(callables); for(Future f : futures) f.get();