Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BF709200BBD for ; Tue, 4 Oct 2016 00:58:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BE197160ADC; Mon, 3 Oct 2016 22:58:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id D4F26160AED for ; Tue, 4 Oct 2016 00:58:34 +0200 (CEST) Received: (qmail 6487 invoked by uid 500); 3 Oct 2016 22:58:17 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 3417 invoked by uid 99); 3 Oct 2016 22:58:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Oct 2016 22:58:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D6EBAE0FC4; Mon, 3 Oct 2016 22:58:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Mon, 03 Oct 2016 22:59:08 -0000 Message-Id: <7f34445d25fa4e80bb6580dd09a0d08e@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [55/57] [abbrv] hadoop git commit: MAPREDUCE-6638. Do not attempt to recover progress from previous job attempts if spill encryption is enabled. (Haibo Chen via kasha) archived-at: Mon, 03 Oct 2016 22:58:35 -0000 MAPREDUCE-6638. Do not attempt to recover progress from previous job attempts if spill encryption is enabled. (Haibo Chen via kasha) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/de7a0a92 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/de7a0a92 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/de7a0a92 Branch: refs/heads/HDFS-10467 Commit: de7a0a92ca1983b35ca4beb7ab712fd700a9e6e0 Parents: 7442084 Author: Karthik Kambatla Authored: Mon Oct 3 10:30:22 2016 -0700 Committer: Karthik Kambatla Committed: Mon Oct 3 10:30:22 2016 -0700 ---------------------------------------------------------------------- .../hadoop/mapreduce/v2/app/MRAppMaster.java | 90 ++++++++++++++------ .../hadoop/mapreduce/v2/app/TestRecovery.java | 66 ++++++++++++++ 2 files changed, 129 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/de7a0a92/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index d94f8a5..4a8a90e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -149,7 +149,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.LogManager; @@ -1303,44 +1302,77 @@ public class MRAppMaster extends CompositeService { } private void processRecovery() throws IOException{ - if (appAttemptID.getAttemptId() == 1) { - return; // no need to recover on the first attempt + boolean attemptRecovery = shouldAttemptRecovery(); + boolean recoverySucceeded = true; + if (attemptRecovery) { + LOG.info("Attempting to recover."); + try { + parsePreviousJobHistory(); + } catch (IOException e) { + LOG.warn("Unable to parse prior job history, aborting recovery", e); + recoverySucceeded = false; + } + } + + if (!isFirstAttempt() && (!attemptRecovery || !recoverySucceeded)) { + amInfos.addAll(readJustAMInfos()); + } + } + + private boolean isFirstAttempt() { + return appAttemptID.getAttemptId() == 1; + } + + /** + * Check if the current job attempt should try to recover from previous + * job attempts if any. + */ + private boolean shouldAttemptRecovery() throws IOException { + if (isFirstAttempt()) { + return false; // no need to recover on the first attempt } boolean recoveryEnabled = getConfig().getBoolean( MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); + if (!recoveryEnabled) { + LOG.info("Not attempting to recover. Recovery disabled. To enable " + + "recovery, set " + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE); + return false; + } boolean recoverySupportedByCommitter = isRecoverySupported(); + if (!recoverySupportedByCommitter) { + LOG.info("Not attempting to recover. Recovery is not supported by " + + committer.getClass() + ". Use an OutputCommitter that supports" + + " recovery."); + return false; + } - // If a shuffle secret was not provided by the job client then this app - // attempt will generate one. However that disables recovery if there - // are reducers as the shuffle secret would be app attempt specific. - int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0); + int reducerCount = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0); + + // If a shuffle secret was not provided by the job client, one will be + // generated in this job attempt. However, that disables recovery if + // there are reducers as the shuffle secret would be job attempt specific. boolean shuffleKeyValidForRecovery = TokenCache.getShuffleSecretKey(jobCredentials) != null; + if (reducerCount > 0 && !shuffleKeyValidForRecovery) { + LOG.info("Not attempting to recover. The shuffle key is invalid for " + + "recovery."); + return false; + } - if (recoveryEnabled && recoverySupportedByCommitter - && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) { - LOG.info("Recovery is enabled. " - + "Will try to recover from previous life on best effort basis."); - try { - parsePreviousJobHistory(); - } catch (IOException e) { - LOG.warn("Unable to parse prior job history, aborting recovery", e); - // try to get just the AMInfos - amInfos.addAll(readJustAMInfos()); - } - } else { - LOG.info("Will not try to recover. recoveryEnabled: " - + recoveryEnabled + " recoverySupportedByCommitter: " - + recoverySupportedByCommitter + " numReduceTasks: " - + numReduceTasks + " shuffleKeyValidForRecovery: " - + shuffleKeyValidForRecovery + " ApplicationAttemptID: " - + appAttemptID.getAttemptId()); - // Get the amInfos anyways whether recovery is enabled or not - amInfos.addAll(readJustAMInfos()); + // If the intermediate data is encrypted, recovering the job requires the + // access to the key. Until the encryption key is persisted, we should + // avoid attempts to recover. + boolean spillEncrypted = CryptoUtils.isEncryptedSpillEnabled(getConfig()); + if (reducerCount > 0 && spillEncrypted) { + LOG.info("Not attempting to recover. Intermediate spill encryption" + + " is enabled."); + return false; } + + return true; } private static FSDataInputStream getPreviousJobHistoryStream( @@ -1440,6 +1472,10 @@ public class MRAppMaster extends CompositeService { return amInfos; } + public boolean recovered() { + return recoveredJobStartTime > 0; + } + /** * This can be overridden to instantiate multiple jobs and create a * workflow. http://git-wip-us.apache.org/repos/asf/hadoop/blob/de7a0a92/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 9d5f0ae..071575a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -579,6 +579,72 @@ public class TestRecovery { app.verifyCompleted(); } + @Test + public void testRecoveryWithSpillEncryption() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), + true, ++runCount) { + }; + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + + // run the MR job at the first attempt + Job jobAttempt1 = app.submit(conf); + app.waitForState(jobAttempt1, JobState.RUNNING); + + Iterator tasks = jobAttempt1.getTasks().values().iterator(); + + // finish the map task but the reduce task + Task mapper = tasks.next(); + app.waitForState(mapper, TaskState.RUNNING); + TaskAttempt mapAttempt = mapper.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE)); + app.waitForState(mapper, TaskState.SUCCEEDED); + + // crash the first attempt of the MR job + app.stop(); + + // run the MR job again at the second attempt + app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false, + ++runCount); + Job jobAttempt2 = app.submit(conf); + Assert.assertTrue("Recovery from previous job attempt is processed even " + + "though intermediate data encryption is enabled.", !app.recovered()); + + // The map task succeeded from previous job attempt will not be recovered + // because the data spill encryption is enabled. + // Let's finish the job at the second attempt and verify its completion. + app.waitForState(jobAttempt2, JobState.RUNNING); + tasks = jobAttempt2.getTasks().values().iterator(); + mapper = tasks.next(); + Task reducer = tasks.next(); + + // finish the map task first + app.waitForState(mapper, TaskState.RUNNING); + mapAttempt = mapper.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE)); + app.waitForState(mapper, TaskState.SUCCEEDED); + + // then finish the reduce task + TaskAttempt redAttempt = reducer.getAttempts().values().iterator().next(); + app.waitForState(redAttempt, TaskAttemptState.RUNNING); + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(redAttempt.getID(), TaskAttemptEventType.TA_DONE)); + app.waitForState(reducer, TaskState.SUCCEEDED); + + // verify that the job succeeds at the 2rd attempt + app.waitForState(jobAttempt2, JobState.SUCCEEDED); + } + /** * This test case primarily verifies if the recovery is controlled through config * property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org