Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9653111D57 for ; Sat, 12 Jul 2014 02:26:01 +0000 (UTC) Received: (qmail 50996 invoked by uid 500); 12 Jul 2014 02:26:01 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 50911 invoked by uid 500); 12 Jul 2014 02:26:01 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 50899 invoked by uid 99); 12 Jul 2014 02:26:01 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Jul 2014 02:26:01 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 12 Jul 2014 02:25:53 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 662DC2388C8B; Sat, 12 Jul 2014 02:25:04 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1609878 [1/2] - in /hadoop/common/branches/YARN-1051/hadoop-mapreduce-project: ./ dev-support/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-c... Date: Sat, 12 Jul 2014 02:25:03 -0000 To: mapreduce-commits@hadoop.apache.org From: subru@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140712022504.662DC2388C8B@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: subru Date: Sat Jul 12 02:24:40 2014 New Revision: 1609878 URL: http://svn.apache.org/r1609878 Log: syncing YARN-1051 branch with trunk Added: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ - copied from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java - copied unchanged from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java - copied unchanged from r1605891, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java Removed: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/ (props changed) hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt (contents, props changed) hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/HostUtil.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (contents, props changed) hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFixedLengthInputFormat.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFixedLengthInputFormat.java Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1603348-1609877 Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt Sat Jul 12 02:24:40 2014 @@ -145,6 +145,24 @@ Trunk (Unreleased) MAPREDUCE-5867. Fix NPE in KillAMPreemptionPolicy related to ProportionalCapacityPreemptionPolicy (Sunil G via devaraj) +Release 2.6.0 - UNRELEASED + + INCOMPATIBLE CHANGES + + NEW FEATURES + + IMPROVEMENTS + + OPTIMIZATIONS + + BUG FIXES + + MAPREDUCE-5866. TestFixedLengthInputFormat fails in windows. + (Varun Vasudev via cnauroth) + + MAPREDUCE-5956. Made MR AM not use maxAttempts to determine if the current + attempt is the last retry. (Wangda Tan via zjshen) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -213,6 +231,12 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid occassional failures. (Mit Desai via vinodkv) + MAPREDUCE-5896. InputSplits should indicate which locations have the block + cached in memory. (Sandy Ryza via kasha) + + MAPREDUCE-5844. Add a configurable delay to reducer-preemption. + (Maysam Yabandeh via kasha) + OPTIMIZATIONS BUG FIXES @@ -267,6 +291,19 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5924. Changed TaskAttemptImpl to ignore TA_COMMIT_PENDING event at COMMIT_PENDING state. (Zhijie Shen via jianhe) + MAPREDUCE-5939. StartTime showing up as the epoch time in JHS UI after + upgrade (Chen He via jlowe) + + MAPREDUCE-5900. Changed to the interpret container preemption exit code as a + task attempt killing event. (Mayank Bansal via zjshen) + + MAPREDUCE-5868. Fixed an issue with TestPipeApplication that was causing the + nightly builds to fail. (Akira Ajisaka via vinodkv) + + MAPREDUCE-5517. Fixed MapReduce ApplicationMaster to not validate reduce side + resource configuration for deciding uber-mode on map-only jobs. (Siqi Li via + vinodkv) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES @@ -275,6 +312,10 @@ Release 2.4.1 - 2014-06-23 IMPROVEMENTS + MAPREDUCE-5830. Added back the private API HostUtil.getTaskLogUrl(..) for + binary compatibility with older clients like Hive 0.13. (Akira Ajisaka via + vinodkv) + OPTIMIZATIONS BUG FIXES Propchange: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/CHANGES.txt ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1603348-1609877 Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Sat Jul 12 02:24:40 2014 @@ -475,8 +475,8 @@ - - + + Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sat Jul 12 02:24:40 2014 @@ -186,7 +186,6 @@ public class MRAppMaster extends Composi private final int nmPort; private final int nmHttpPort; protected final MRAppMetrics metrics; - private final int maxAppAttempts; private Map completedTasksFromPreviousRun; private List amInfos; private AppContext context; @@ -227,14 +226,14 @@ public class MRAppMaster extends Composi public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, - long appSubmitTime, int maxAppAttempts) { + long appSubmitTime) { this(applicationAttemptId, containerId, nmHost, nmPort, nmHttpPort, - new SystemClock(), appSubmitTime, maxAppAttempts); + new SystemClock(), appSubmitTime); } public MRAppMaster(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String nmHost, int nmPort, int nmHttpPort, - Clock clock, long appSubmitTime, int maxAppAttempts) { + Clock clock, long appSubmitTime) { super(MRAppMaster.class.getName()); this.clock = clock; this.startTime = clock.getTime(); @@ -245,7 +244,6 @@ public class MRAppMaster extends Composi this.nmPort = nmPort; this.nmHttpPort = nmHttpPort; this.metrics = MRAppMetrics.create(); - this.maxAppAttempts = maxAppAttempts; logSyncer = TaskLog.createLogSyncer(); LOG.info("Created MRAppMaster for application " + applicationAttemptId); } @@ -258,12 +256,6 @@ public class MRAppMaster extends Composi context = new RunningAppContext(conf); - ((RunningAppContext)context).computeIsLastAMRetry(); - LOG.info("The specific max attempts: " + maxAppAttempts + - " for application: " + appAttemptID.getApplicationId().getId() + - ". Attempt num: " + appAttemptID.getAttemptId() + - " is last retry: " + isLastAMRetry); - // Job name is the same as the app name util we support DAG of jobs // for an app later appName = conf.get(MRJobConfig.JOB_NAME, ""); @@ -1007,8 +999,8 @@ public class MRAppMaster extends Composi successfullyUnregistered.set(true); } - public void computeIsLastAMRetry() { - isLastAMRetry = appAttemptID.getAttemptId() >= maxAppAttempts; + public void resetIsLastAMRetry() { + isLastAMRetry = false; } } @@ -1388,8 +1380,6 @@ public class MRAppMaster extends Composi System.getenv(Environment.NM_HTTP_PORT.name()); String appSubmitTimeStr = System.getenv(ApplicationConstants.APP_SUBMIT_TIME_ENV); - String maxAppAttempts = - System.getenv(ApplicationConstants.MAX_APP_ATTEMPTS_ENV); validateInputParam(containerIdStr, Environment.CONTAINER_ID.name()); @@ -1399,8 +1389,6 @@ public class MRAppMaster extends Composi Environment.NM_HTTP_PORT.name()); validateInputParam(appSubmitTimeStr, ApplicationConstants.APP_SUBMIT_TIME_ENV); - validateInputParam(maxAppAttempts, - ApplicationConstants.MAX_APP_ATTEMPTS_ENV); ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); ApplicationAttemptId applicationAttemptId = @@ -1411,8 +1399,7 @@ public class MRAppMaster extends Composi MRAppMaster appMaster = new MRAppMaster(applicationAttemptId, containerId, nodeHostString, Integer.parseInt(nodePortString), - Integer.parseInt(nodeHttpPortString), appSubmitTime, - Integer.parseInt(maxAppAttempts)); + Integer.parseInt(nodeHttpPortString), appSubmitTime); ShutdownHookManager.get().addShutdownHook( new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY); JobConf conf = new JobConf(new YarnConfiguration()); Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Sat Jul 12 02:24:40 2014 @@ -1218,22 +1218,25 @@ public class JobImpl implements org.apac boolean smallNumReduceTasks = (numReduceTasks <= sysMaxReduces); boolean smallInput = (dataInputLength <= sysMaxBytes); // ignoring overhead due to UberAM and statics as negligible here: + long requiredMapMB = conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0); + long requiredReduceMB = conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0); + long requiredMB = Math.max(requiredMapMB, requiredReduceMB); + int requiredMapCores = conf.getInt( + MRJobConfig.MAP_CPU_VCORES, + MRJobConfig.DEFAULT_MAP_CPU_VCORES); + int requiredReduceCores = conf.getInt( + MRJobConfig.REDUCE_CPU_VCORES, + MRJobConfig.DEFAULT_REDUCE_CPU_VCORES); + int requiredCores = Math.max(requiredMapCores, requiredReduceCores); + if (numReduceTasks == 0) { + requiredMB = requiredMapMB; + requiredCores = requiredMapCores; + } boolean smallMemory = - ( (Math.max(conf.getLong(MRJobConfig.MAP_MEMORY_MB, 0), - conf.getLong(MRJobConfig.REDUCE_MEMORY_MB, 0)) - <= sysMemSizeForUberSlot) - || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT)); - boolean smallCpu = - ( - Math.max( - conf.getInt( - MRJobConfig.MAP_CPU_VCORES, - MRJobConfig.DEFAULT_MAP_CPU_VCORES), - conf.getInt( - MRJobConfig.REDUCE_CPU_VCORES, - MRJobConfig.DEFAULT_REDUCE_CPU_VCORES)) - <= sysCPUSizeForUberSlot - ); + (requiredMB <= sysMemSizeForUberSlot) + || (sysMemSizeForUberSlot == JobConf.DISABLED_MEMORY_LIMIT); + + boolean smallCpu = requiredCores <= sysCPUSizeForUberSlot; boolean notChainJob = !isChainJob(conf); // User has overall veto power over uberization, or user can modify Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Sat Jul 12 02:24:40 2014 @@ -185,7 +185,7 @@ public abstract class RMCommunicator ext // if unregistration failed, isLastAMRetry needs to be recalculated // to see whether AM really has the chance to retry RunningAppContext raContext = (RunningAppContext) context; - raContext.computeIsLastAMRetry(); + raContext.resetIsLastAMRetry(); } } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Jul 12 02:24:40 2014 @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -143,15 +144,21 @@ public class RMContainerAllocator extend private int lastCompletedTasks = 0; private boolean recalculateReduceSchedule = false; - private int mapResourceReqt;//memory - private int reduceResourceReqt;//memory + private int mapResourceRequest;//memory + private int reduceResourceRequest;//memory private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; + /** + * after this threshold, if the container request is not allocated, it is + * considered delayed. + */ + private long allocationDelayThresholdMs = 0; private float reduceSlowStart = 0; private long retryInterval; private long retrystartTime; + private Clock clock; private final AMPreemptionPolicy preemptionPolicy; @@ -166,6 +173,7 @@ public class RMContainerAllocator extend super(clientService, context); this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); + this.clock = context.getClock(); } @Override @@ -180,6 +188,9 @@ public class RMContainerAllocator extend maxReducePreemptionLimit = conf.getFloat( MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); + allocationDelayThresholdMs = conf.getInt( + MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms RackResolver.init(conf); retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); @@ -246,7 +257,7 @@ public class RMContainerAllocator extend getJob().getTotalMaps(), completedMaps, scheduledRequests.maps.size(), scheduledRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(), - mapResourceReqt, reduceResourceReqt, + mapResourceRequest, reduceResourceRequest, pendingReduces.size(), maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; @@ -268,6 +279,18 @@ public class RMContainerAllocator extend scheduleStats.log("Final Stats: "); } + @Private + @VisibleForTesting + AssignedRequests getAssignedRequests() { + return assignedRequests; + } + + @Private + @VisibleForTesting + ScheduledRequests getScheduledRequests() { + return scheduledRequests; + } + public boolean getIsReduceStarted() { return reduceStarted; } @@ -303,16 +326,16 @@ public class RMContainerAllocator extend int supportedMaxContainerCapability = getMaxContainerCapability().getMemory(); if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { - if (mapResourceReqt == 0) { - mapResourceReqt = reqEvent.getCapability().getMemory(); + if (mapResourceRequest == 0) { + mapResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, - mapResourceReqt))); - LOG.info("mapResourceReqt:"+mapResourceReqt); - if (mapResourceReqt > supportedMaxContainerCapability) { + mapResourceRequest))); + LOG.info("mapResourceRequest:"+ mapResourceRequest); + if (mapResourceRequest > supportedMaxContainerCapability) { String diagMsg = "MAP capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. mapResourceReqt: " + - mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability; + "max container capability in the cluster. Killing the Job. mapResourceRequest: " + + mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( jobId, diagMsg)); @@ -320,20 +343,20 @@ public class RMContainerAllocator extend } } //set the rounded off memory - reqEvent.getCapability().setMemory(mapResourceReqt); + reqEvent.getCapability().setMemory(mapResourceRequest); scheduledRequests.addMap(reqEvent);//maps are immediately scheduled } else { - if (reduceResourceReqt == 0) { - reduceResourceReqt = reqEvent.getCapability().getMemory(); + if (reduceResourceRequest == 0) { + reduceResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, - reduceResourceReqt))); - LOG.info("reduceResourceReqt:"+reduceResourceReqt); - if (reduceResourceReqt > supportedMaxContainerCapability) { + reduceResourceRequest))); + LOG.info("reduceResourceRequest:"+ reduceResourceRequest); + if (reduceResourceRequest > supportedMaxContainerCapability) { String diagMsg = "REDUCE capability required is more than the " + "supported max container capability in the cluster. Killing the " + - "Job. reduceResourceReqt: " + reduceResourceReqt + + "Job. reduceResourceRequest: " + reduceResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( @@ -342,7 +365,7 @@ public class RMContainerAllocator extend } } //set the rounded off memory - reqEvent.getCapability().setMemory(reduceResourceReqt); + reqEvent.getCapability().setMemory(reduceResourceRequest); if (reqEvent.getEarlierAttemptFailed()) { //add to the front of queue for fail fast pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); @@ -394,8 +417,22 @@ public class RMContainerAllocator extend return host; } - private void preemptReducesIfNeeded() { - if (reduceResourceReqt == 0) { + @Private + @VisibleForTesting + synchronized void setReduceResourceRequest(int mem) { + this.reduceResourceRequest = mem; + } + + @Private + @VisibleForTesting + synchronized void setMapResourceRequest(int mem) { + this.mapResourceRequest = mem; + } + + @Private + @VisibleForTesting + void preemptReducesIfNeeded() { + if (reduceResourceRequest == 0) { return; //no reduces } //check if reduces have taken over the whole cluster and there are @@ -403,9 +440,9 @@ public class RMContainerAllocator extend if (scheduledRequests.maps.size() > 0) { int memLimit = getMemLimit(); int availableMemForMap = memLimit - ((assignedRequests.reduces.size() - - assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt); + assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest); //availableMemForMap must be sufficient to run atleast 1 map - if (availableMemForMap < mapResourceReqt) { + if (availableMemForMap < mapResourceRequest) { //to make sure new containers are given to maps and not reduces //ramp down all scheduled reduces if any //(since reduces are scheduled at higher priority than maps) @@ -414,22 +451,40 @@ public class RMContainerAllocator extend pendingReduces.add(req); } scheduledRequests.reduces.clear(); - - //preempt for making space for at least one map - int premeptionLimit = Math.max(mapResourceReqt, - (int) (maxReducePreemptionLimit * memLimit)); - - int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, - premeptionLimit); - - int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); - toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - - LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); - assignedRequests.preemptReduce(toPreempt); + + //do further checking to find the number of map requests that were + //hanging around for a while + int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); + if (hangingMapRequests > 0) { + //preempt for making space for at least one map + int premeptionLimit = Math.max(mapResourceRequest, + (int) (maxReducePreemptionLimit * memLimit)); + + int preemptMem = Math.min(hangingMapRequests * mapResourceRequest, + premeptionLimit); + + int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest); + toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); + + LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); + assignedRequests.preemptReduce(toPreempt); + } } } } + + private int getNumOfHangingRequests(Map requestMap) { + if (allocationDelayThresholdMs <= 0) + return requestMap.size(); + int hangingRequests = 0; + long currTime = clock.getTime(); + for (ContainerRequest request: requestMap.values()) { + long delay = currTime - request.requestTimeMs; + if (delay > allocationDelayThresholdMs) + hangingRequests++; + } + return hangingRequests; + } @Private public void scheduleReduces( @@ -664,7 +719,8 @@ public class RMContainerAllocator extend @VisibleForTesting public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, TaskAttemptId attemptID) { - if (cont.getExitStatus() == ContainerExitStatus.ABORTED) { + if (cont.getExitStatus() == ContainerExitStatus.ABORTED + || cont.getExitStatus() == ContainerExitStatus.PREEMPTED) { // killed by framework return new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_KILL); @@ -715,11 +771,13 @@ public class RMContainerAllocator extend @Private public int getMemLimit() { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; - return headRoom + assignedRequests.maps.size() * mapResourceReqt + - assignedRequests.reduces.size() * reduceResourceReqt; + return headRoom + assignedRequests.maps.size() * mapResourceRequest + + assignedRequests.reduces.size() * reduceResourceRequest; } - - private class ScheduledRequests { + + @Private + @VisibleForTesting + class ScheduledRequests { private final LinkedList earlierFailedMaps = new LinkedList(); @@ -729,7 +787,8 @@ public class RMContainerAllocator extend new HashMap>(); private final Map> mapsRackMapping = new HashMap>(); - private final Map maps = + @VisibleForTesting + final Map maps = new LinkedHashMap(); private final LinkedHashMap reduces = @@ -825,22 +884,22 @@ public class RMContainerAllocator extend int allocatedMemory = allocated.getResource().getMemory(); if (PRIORITY_FAST_FAIL_MAP.equals(priority) || PRIORITY_MAP.equals(priority)) { - if (allocatedMemory < mapResourceReqt + if (allocatedMemory < mapResourceRequest || maps.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a map as either " - + " container memory less than required " + mapResourceReqt + + " container memory less than required " + mapResourceRequest + " or no pending map tasks - maps.isEmpty=" + maps.isEmpty()); isAssignable = false; } } else if (PRIORITY_REDUCE.equals(priority)) { - if (allocatedMemory < reduceResourceReqt + if (allocatedMemory < reduceResourceRequest || reduces.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a reduce as either " - + " container memory less than required " + reduceResourceReqt + + " container memory less than required " + reduceResourceRequest + " or no pending reduce tasks - reduces.isEmpty=" + reduces.isEmpty()); isAssignable = false; @@ -1119,14 +1178,18 @@ public class RMContainerAllocator extend } } - private class AssignedRequests { + @Private + @VisibleForTesting + class AssignedRequests { private final Map containerToAttemptMap = new HashMap(); private final LinkedHashMap maps = new LinkedHashMap(); - private final LinkedHashMap reduces = + @VisibleForTesting + final LinkedHashMap reduces = new LinkedHashMap(); - private final Set preemptionWaitingReduces = + @VisibleForTesting + final Set preemptionWaitingReduces = new HashSet(); void add(Container container, TaskAttemptId tId) { Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Sat Jul 12 02:24:40 2014 @@ -29,8 +29,10 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -96,6 +98,8 @@ public abstract class RMContainerRequest super(clientService, context); } + @Private + @VisibleForTesting static class ContainerRequest { final TaskAttemptId attemptID; final Resource capability; @@ -103,20 +107,39 @@ public abstract class RMContainerRequest final String[] racks; //final boolean earlierAttemptFailed; final Priority priority; - + /** + * the time when this request object was formed; can be used to avoid + * aggressive preemption for recently placed requests + */ + final long requestTimeMs; + public ContainerRequest(ContainerRequestEvent event, Priority priority) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), event.getRacks(), priority); } - + + public ContainerRequest(ContainerRequestEvent event, Priority priority, + long requestTimeMs) { + this(event.getAttemptID(), event.getCapability(), event.getHosts(), + event.getRacks(), priority, requestTimeMs); + } + + public ContainerRequest(TaskAttemptId attemptID, + Resource capability, String[] hosts, String[] racks, + Priority priority) { + this(attemptID, capability, hosts, racks, priority, + System.currentTimeMillis()); + } + public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority) { + Resource capability, String[] hosts, String[] racks, + Priority priority, long requestTimeMs) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; this.racks = racks; this.priority = priority; + this.requestTimeMs = requestTimeMs; } public String toString() { Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Sat Jul 12 02:24:40 2014 @@ -24,8 +24,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.util.EnumSet; -import org.junit.Assert; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -92,6 +90,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.event.EventHandler; @@ -101,6 +100,7 @@ import org.apache.hadoop.yarn.state.Stat import org.apache.hadoop.yarn.state.StateMachineFactory; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; +import org.junit.Assert; /** @@ -228,8 +228,8 @@ public class MRApp extends MRAppMaster { int maps, int reduces, boolean autoComplete, String testName, boolean cleanOnStart, int startCount, Clock clock, boolean unregistered, String assignedQueue) { - super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System - .currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, + System.currentTimeMillis()); this.testWorkDir = new File("target", testName); testAbsPath = new Path(testWorkDir.getAbsolutePath()); LOG.info("PathUsed: " + testAbsPath); @@ -573,7 +573,8 @@ public class MRApp extends MRAppMaster { Resource resource = Resource.newInstance(1234, 2); ContainerTokenIdentifier containerTokenIdentifier = new ContainerTokenIdentifier(cId, nodeId.toString(), "user", - resource, System.currentTimeMillis() + 10000, 42, 42); + resource, System.currentTimeMillis() + 10000, 42, 42, + Priority.newInstance(0), 0); Token containerToken = newContainerToken(nodeId, "password".getBytes(), containerTokenIdentifier); Container container = Container.newInstance(cId, nodeId, Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Sat Jul 12 02:24:40 2014 @@ -253,6 +253,12 @@ public class TestJobEndNotifier extends HttpServer2 server = startHttpServer(); MRApp app = spy(new MRAppWithCustomContainerAllocator(2, 2, false, this.getClass().getName(), true, 2, false)); + // Currently, we will have isLastRetry always equals to false at beginning + // of MRAppMaster, except staging area exists or commit already started at + // the beginning. + // Now manually set isLastRetry to true and this should reset to false when + // unregister failed. + app.isLastAMRetry = true; doNothing().when(app).sysexit(); JobConf conf = new JobConf(); conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL, @@ -265,12 +271,11 @@ public class TestJobEndNotifier extends // Now shutdown. User should see FAILED state. // Unregistration fails: isLastAMRetry is recalculated, this is app.shutDownJob(); - Assert.assertTrue(app.isLastAMRetry()); - Assert.assertEquals(1, JobEndServlet.calledTimes); - Assert.assertEquals("jobid=" + job.getID() + "&status=FAILED", - JobEndServlet.requestUri.getQuery()); - Assert.assertEquals(JobState.FAILED.toString(), - JobEndServlet.foundJobState); + Assert.assertFalse(app.isLastAMRetry()); + // Since it's not last retry, JobEndServlet didn't called + Assert.assertEquals(0, JobEndServlet.calledTimes); + Assert.assertNull(JobEndServlet.requestUri); + Assert.assertNull(JobEndServlet.foundJobState); server.stop(); } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Sat Jul 12 02:24:40 2014 @@ -118,7 +118,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS); + System.currentTimeMillis()); JobConf conf = new JobConf(); conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -147,8 +147,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -186,8 +185,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -225,8 +223,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -264,8 +261,7 @@ public class TestMRAppMaster { ContainerId containerId = ConverterUtils.toContainerId(containerIdStr); MRAppMaster appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS, - false, false); + System.currentTimeMillis(), false, false); boolean caught = false; try { MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); @@ -285,8 +281,9 @@ public class TestMRAppMaster { @Test (timeout = 30000) public void testMRAppMasterMaxAppAttempts() throws IOException, InterruptedException { - int[] maxAppAttemtps = new int[] { 1, 2, 3 }; - Boolean[] expectedBools = new Boolean[]{ true, true, false }; + // No matter what's the maxAppAttempt or attempt id, the isLastRetry always + // equals to false + Boolean[] expectedBools = new Boolean[]{ false, false, false }; String applicationAttemptIdStr = "appattempt_1317529182569_0004_000002"; String containerIdStr = "container_1317529182569_0004_000002_1"; @@ -301,10 +298,10 @@ public class TestMRAppMaster { File stagingDir = new File(MRApps.getStagingAreaDir(conf, userName).toString()); stagingDir.mkdirs(); - for (int i = 0; i < maxAppAttemtps.length; ++i) { + for (int i = 0; i < expectedBools.length; ++i) { MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), maxAppAttemtps[i], false, true); + System.currentTimeMillis(), false, true); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); assertEquals("isLastAMRetry is correctly computed.", expectedBools[i], appMaster.isLastAMRetry()); @@ -399,7 +396,7 @@ public class TestMRAppMaster { MRAppMasterTest appMaster = new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1, - System.currentTimeMillis(), 1, false, true); + System.currentTimeMillis(), false, true); MRAppMaster.initAndStartAppMaster(appMaster, conf, userName); // Now validate the task credentials @@ -466,16 +463,15 @@ class MRAppMasterTest extends MRAppMaste public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime, int maxAppAttempts) { + long submitTime) { this(applicationAttemptId, containerId, host, port, httpPort, - submitTime, maxAppAttempts, true, true); + submitTime, true, true); } public MRAppMasterTest(ApplicationAttemptId applicationAttemptId, ContainerId containerId, String host, int port, int httpPort, - long submitTime, int maxAppAttempts, boolean overrideInit, + long submitTime, boolean overrideInit, boolean overrideStart) { - super(applicationAttemptId, containerId, host, port, httpPort, submitTime, - maxAppAttempts); + super(applicationAttemptId, containerId, host, port, httpPort, submitTime); this.overrideInit = overrideInit; this.overrideStart = overrideStart; mockContainerAllocator = mock(ContainerAllocator.class); Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Sat Jul 12 02:24:40 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2.app; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.mock; @@ -28,9 +29,6 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import org.junit.Assert; -import junit.framework.TestCase; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -62,13 +60,14 @@ import org.apache.hadoop.yarn.exceptions import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.junit.Assert; import org.junit.Test; /** * Make sure that the job staging directory clean up happens. */ - public class TestStagingCleanup extends TestCase { + public class TestStagingCleanup { private Configuration conf = new Configuration(); private FileSystem fs; @@ -81,7 +80,7 @@ import org.junit.Test; public void testDeletionofStagingOnUnregistrationFailure() throws IOException { testDeletionofStagingOnUnregistrationFailure(2, false); - testDeletionofStagingOnUnregistrationFailure(1, true); + testDeletionofStagingOnUnregistrationFailure(1, false); } @SuppressWarnings("resource") @@ -104,7 +103,7 @@ import org.junit.Test; appMaster.init(conf); appMaster.start(); appMaster.shutDownJob(); - ((RunningAppContext) appMaster.getContext()).computeIsLastAMRetry(); + ((RunningAppContext) appMaster.getContext()).resetIsLastAMRetry(); if (shouldHaveDeleted) { Assert.assertEquals(new Boolean(true), appMaster.isLastAMRetry()); verify(fs).delete(stagingJobPath, true); @@ -164,7 +163,11 @@ import org.junit.Test; verify(fs, times(0)).delete(stagingJobPath, true); } - @Test (timeout = 30000) + // FIXME: + // Disabled this test because currently, when job state=REBOOT at shutdown + // when lastRetry = true in RM view, cleanup will not do. + // This will be supported after YARN-2261 completed +// @Test (timeout = 30000) public void testDeletionofStagingOnReboot() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); @@ -202,7 +205,7 @@ import org.junit.Test; JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); - MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 4); + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); appMaster.init(conf); //simulate the process being killed MRAppMaster.MRAppMasterShutdownHook hook = @@ -210,8 +213,12 @@ import org.junit.Test; hook.run(); verify(fs, times(0)).delete(stagingJobPath, true); } - - @Test (timeout = 30000) + + // FIXME: + // Disabled this test because currently, when shutdown hook triggered at + // lastRetry in RM view, cleanup will not do. This should be supported after + // YARN-2261 completed +// @Test (timeout = 30000) public void testDeletionofStagingOnKillLastTry() throws IOException { conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, stagingJobDir); fs = mock(FileSystem.class); @@ -226,7 +233,7 @@ import org.junit.Test; JobId jobid = recordFactory.newRecordInstance(JobId.class); jobid.setAppId(appId); ContainerAllocator mockAlloc = mock(ContainerAllocator.class); - MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc, 1); //no retry + MRAppMaster appMaster = new TestMRApp(attemptId, mockAlloc); //no retry appMaster.init(conf); assertTrue("appMaster.isLastAMRetry() is false", appMaster.isLastAMRetry()); //simulate the process being killed @@ -245,10 +252,10 @@ import org.junit.Test; boolean crushUnregistration = false; public TestMRApp(ApplicationAttemptId applicationAttemptId, - ContainerAllocator allocator, int maxAppAttempts) { + ContainerAllocator allocator) { super(applicationAttemptId, ContainerId.newInstance( applicationAttemptId, 1), "testhost", 2222, 3333, - System.currentTimeMillis(), maxAppAttempts); + System.currentTimeMillis()); this.allocator = allocator; this.successfullyUnregistered.set(true); } @@ -256,7 +263,7 @@ import org.junit.Test; public TestMRApp(ApplicationAttemptId applicationAttemptId, ContainerAllocator allocator, JobStateInternal jobStateInternal, int maxAppAttempts) { - this(applicationAttemptId, allocator, maxAppAttempts); + this(applicationAttemptId, allocator); this.jobStateInternal = jobStateInternal; } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Sat Jul 12 02:24:40 2014 @@ -657,6 +657,15 @@ public class TestJobImpl { conf.setInt(MRJobConfig.JOB_UBERTASK_MAXMAPS, 1); isUber = testUberDecision(conf); Assert.assertFalse(isUber); + + // enable uber mode of 0 reducer no matter how much memory assigned to reducer + conf = new Configuration(); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, true); + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 2048); + conf.setInt(MRJobConfig.REDUCE_CPU_VCORES, 10); + isUber = testUberDecision(conf); + Assert.assertTrue(isUber); } private boolean testUberDecision(Configuration conf) { Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Sat Jul 12 02:24:40 2014 @@ -65,6 +65,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.app.job.Job; import org.apache.hadoop.mapreduce.v2.app.job.Task; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; +import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; @@ -795,6 +796,178 @@ public class TestTaskAttempt{ finishTime, Long.valueOf(taImpl.getFinishTime())); } + @Test + public void testContainerKillAfterAssigned() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + assertEquals("Task attempt is not in assinged state", + taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + + @Test + public void testContainerKillWhileRunning() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertFalse("InternalError occurred trying to handle TA_KILL", + eventHandler.internalError); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + + @Test + public void testContainerKillWhileCommitPending() throws Exception { + ApplicationId appId = ApplicationId.newInstance(1, 2); + ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, + 0); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0); + Path jobFile = mock(Path.class); + + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptListener taListener = mock(TaskAttemptListener.class); + when(taListener.getAddress()).thenReturn( + new InetSocketAddress("localhost", 0)); + + JobConf jobConf = new JobConf(); + jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class); + jobConf.setBoolean("fs.file.impl.disable.cache", true); + jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, ""); + jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10"); + + TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class); + when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" }); + + AppContext appCtx = mock(AppContext.class); + ClusterInfo clusterInfo = mock(ClusterInfo.class); + Resource resource = mock(Resource.class); + when(appCtx.getClusterInfo()).thenReturn(clusterInfo); + when(resource.getMemory()).thenReturn(1024); + + TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler, + jobFile, 1, splits, jobConf, taListener, new Token(), + new Credentials(), new SystemClock(), appCtx); + + NodeId nid = NodeId.newInstance("127.0.0.2", 0); + ContainerId contId = ContainerId.newInstance(appAttemptId, 3); + Container container = mock(Container.class); + when(container.getId()).thenReturn(contId); + when(container.getNodeId()).thenReturn(nid); + when(container.getNodeHttpAddress()).thenReturn("localhost:0"); + + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_SCHEDULE)); + taImpl.handle(new TaskAttemptContainerAssignedEvent(attemptId, container, + mock(Map.class))); + taImpl.handle(new TaskAttemptContainerLaunchedEvent(attemptId, 0)); + assertEquals("Task attempt is not in running state", taImpl.getState(), + TaskAttemptState.RUNNING); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_COMMIT_PENDING)); + assertEquals("Task should be in COMMIT_PENDING state", + TaskAttemptStateInternal.COMMIT_PENDING, taImpl.getInternalState()); + taImpl.handle(new TaskAttemptEvent(attemptId, + TaskAttemptEventType.TA_KILL)); + assertFalse("InternalError occurred trying to handle TA_KILL", + eventHandler.internalError); + assertEquals("Task should be in KILLED state", + TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, + taImpl.getInternalState()); + } + public static class MockEventHandler implements EventHandler { public boolean internalError; Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncherImpl.java Sat Jul 12 02:24:40 2014 @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.Token; import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData; @@ -402,7 +403,7 @@ public class TestContainerLauncherImpl { 1234), "password".getBytes(), new ContainerTokenIdentifier( contId, containerManagerAddr, "user", Resource.newInstance(1024, 1), - currentTime + 10000L, 123, currentTime)); + currentTime + 10000L, 123, currentTime, Priority.newInstance(0), 0)); } private static class ContainerManagerForTest implements ContainerManagementProtocol { Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?rev=1609878&r1=1605891&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Sat Jul 12 02:24:40 2014 @@ -1959,6 +1959,22 @@ public class TestRMContainerAllocator { TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent( abortedStatus, attemptId); Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); + + ContainerId containerId2 = ContainerId.newInstance(applicationAttemptId, 2); + ContainerStatus status2 = ContainerStatus.newInstance(containerId2, + ContainerState.RUNNING, "", 0); + + ContainerStatus preemptedStatus = ContainerStatus.newInstance(containerId2, + ContainerState.RUNNING, "", ContainerExitStatus.PREEMPTED); + + TaskAttemptEvent event2 = allocator.createContainerFinishedEvent(status2, + attemptId); + Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, + event2.getType()); + + TaskAttemptEvent abortedEvent2 = allocator.createContainerFinishedEvent( + preemptedStatus, attemptId); + Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent2.getType()); } @Test Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/FileNameIndexUtils.java Sat Jul 12 02:24:40 2014 @@ -168,10 +168,14 @@ public class FileNameIndexUtils { decodeJobHistoryFileName(jobDetails[QUEUE_NAME_INDEX])); try{ - indexInfo.setJobStartTime( - Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX]))); + if (jobDetails.length <= JOB_START_TIME_INDEX) { + indexInfo.setJobStartTime(indexInfo.getSubmitTime()); + } else { + indexInfo.setJobStartTime( + Long.parseLong(decodeJobHistoryFileName(jobDetails[JOB_START_TIME_INDEX]))); + } } catch (NumberFormatException e){ - LOG.warn("Unable to parse launch time from job history file " + LOG.warn("Unable to parse start time from job history file " + jhFileName + " : " + e); } } catch (IndexOutOfBoundsException e) { Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/jobhistory/TestFileNameIndexUtils.java Sat Jul 12 02:24:40 2014 @@ -39,6 +39,17 @@ public class TestFileNameIndexUtils { + FileNameIndexUtils.DELIMITER + "%s" + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION; + private static final String OLD_FORMAT_BEFORE_ADD_START_TIME = "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + FileNameIndexUtils.DELIMITER + "%s" + + JobHistoryUtils.JOB_HISTORY_FILE_EXTENSION; + private static final String JOB_HISTORY_FILE_FORMATTER = "%s" + FileNameIndexUtils.DELIMITER + "%s" + FileNameIndexUtils.DELIMITER + "%s" @@ -236,6 +247,22 @@ public class TestFileNameIndexUtils { } @Test + public void testJobStartTimeBackwardsCompatible() throws IOException{ + String jobHistoryFile = String.format(OLD_FORMAT_BEFORE_ADD_START_TIME, + JOB_ID, + SUBMIT_TIME, + USER_NAME, + JOB_NAME_WITH_DELIMITER_ESCAPE, + FINISH_TIME, + NUM_MAPS, + NUM_REDUCES, + JOB_STATUS, + QUEUE_NAME ); + JobIndexInfo info = FileNameIndexUtils.getIndexInfo(jobHistoryFile); + Assert.assertEquals(info.getJobStartTime(), info.getSubmitTime()); + } + + @Test public void testJobHistoryFileNameBackwardsCompatible() throws IOException { JobID oldJobId = JobID.forName(JOB_ID); JobId jobId = TypeConverter.toYarn(oldJobId); Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java Sat Jul 12 02:24:40 2014 @@ -295,6 +295,15 @@ public abstract class FileInputFormat SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts)); + splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { - String[] splitHosts = getSplitHosts(blkLocations, length + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts)); + splitHosts[0], splitHosts[1])); } } else { - String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts)); + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //Create empty hosts array for zero length files @@ -538,10 +547,30 @@ public abstract class FileInputFormat= splitSize) { - return blkLocations[startIndex].getHosts(); + return new String[][] { blkLocations[startIndex].getHosts(), + blkLocations[startIndex].getCachedHosts() }; } long bytesInFirstBlock = bytesInThisBlock; @@ -639,7 +669,9 @@ public abstract class FileInputFormat splitSize + return new String[][] { identifyHosts(allTopos.length, racksMap), + new String[0]}; } private String[] identifyHosts(int replicationFactor, Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java Sat Jul 12 02:24:40 2014 @@ -24,6 +24,7 @@ import java.io.DataOutput; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; /** A section of an input file. Returned by {@link @@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path; @InterfaceAudience.Public @InterfaceStability.Stable public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit - implements InputSplit { + implements InputSplitWithLocationInfo { org.apache.hadoop.mapreduce.lib.input.FileSplit fs; protected FileSplit() { fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(); @@ -62,6 +63,20 @@ public class FileSplit extends org.apach length, hosts); } + /** Constructs a split with host information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block, possibly null + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start, + length, hosts, inMemoryHosts); + } + public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) { this.fs = fs; } @@ -92,4 +107,9 @@ public class FileSplit extends org.apach return fs.getLocations(); } + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return fs.getLocationInfo(); + } } Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java Sat Jul 12 02:24:40 2014 @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; @@ -51,10 +53,25 @@ public abstract class InputSplit { /** * Get the list of nodes by name where the data for the split would be local. * The locations do not need to be serialized. + * * @return a new array of the node nodes. * @throws IOException * @throws InterruptedException */ public abstract String[] getLocations() throws IOException, InterruptedException; + + /** + * Gets info about which nodes the input split is stored on and how it is + * stored at each location. + * + * @return list of SplitLocationInfos describing how the split + * data is stored at each location. A null value indicates that all the + * locations have the data stored on disk. + * @throws IOException + */ + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return null; + } } \ No newline at end of file Modified: hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1609878&r1=1609877&r2=1609878&view=diff ============================================================================== --- hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original) +++ hadoop/common/branches/YARN-1051/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Sat Jul 12 02:24:40 2014 @@ -579,7 +579,17 @@ public interface MRJobConfig { MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold"; public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = 50; - + + /** + * The threshold in terms of seconds after which an unsatisfied mapper request + * triggers reducer preemption to free space. Default 0 implies that the reduces + * should be preempted immediately after allocation if there is currently no + * room for newly allocated mappers. + */ + public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC = + "mapreduce.job.reducer.preempt.delay.sec"; + public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0; + public static final String MR_AM_ENV = MR_AM_PREFIX + "env";