Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 63928DF29 for ; Tue, 25 Sep 2012 18:27:16 +0000 (UTC) Received: (qmail 40666 invoked by uid 500); 25 Sep 2012 18:27:16 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 40620 invoked by uid 500); 25 Sep 2012 18:27:16 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 40613 invoked by uid 99); 25 Sep 2012 18:27:16 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Sep 2012 18:27:16 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 25 Sep 2012 18:27:12 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id C979423888CD for ; Tue, 25 Sep 2012 18:26:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1390042 - in /hadoop/common/branches/branch-1.1: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/ Date: Tue, 25 Sep 2012 18:26:28 -0000 To: common-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120925182628.C979423888CD@eris.apache.org> Author: acmurthy Date: Tue Sep 25 18:26:28 2012 New Revision: 1390042 URL: http://svn.apache.org/viewvc?rev=1390042&view=rev Log: Merge -c 1390041 from branch-1 to branch-1.1 to fix MAPREDUCE-4603. Add support for JobClient to retry job-submission when JobTracker is in SafeMode. Contributed by Arun C. Murthy. Added: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java - copied unchanged from r1390041, hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java - copied unchanged from r1390041, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java Modified: hadoop/common/branches/branch-1.1/CHANGES.txt hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Modified: hadoop/common/branches/branch-1.1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1390042&r1=1390041&r2=1390042&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 18:26:28 2012 @@ -187,14 +187,15 @@ Release 1.1.0 - 2012.09.16 MAPREDUCE-782. Use PureJavaCrc32 in mapreduce spills. (Todd Lipcon, backport by Brandon Li via sseth) - HDFS-3667. Add retry support to WebHdfsFileSystem. (szetszwo) - HADOOP-8748. Refactor DFSClient retry utility methods to a new class in org.apache.hadoop.io.retry. Contributed by Arun C Murthy. HDFS-3871. Change DFSClient to use RetryUtils. (Arun C Murthy via szetszwo) + MAPREDUCE-4603. Add support for JobClient to retry job-submission when + JobTracker is in SafeMode. (acmurthy) + BUG FIXES HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1390042&r1=1390041&r2=1390042&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue Sep 25 18:26:28 2012 @@ -40,6 +40,7 @@ import java.security.PrivilegedException import java.util.Arrays; import java.util.Collection; import java.util.Comparator; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -59,6 +60,10 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.Counters.Counter; @@ -428,7 +433,9 @@ public class JobClient extends Configure } } + private JobSubmissionProtocol rpcJobSubmitClient; private JobSubmissionProtocol jobSubmitClient; + private Path sysDir = null; private Path stagingAreaDir = null; @@ -439,6 +446,15 @@ public class JobClient extends Configure private static final int DEFAULT_TASKLOG_TIMEOUT = 60000; static int tasklogtimeout; + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY = + "mapreduce.jobclient.retry.policy.enabled"; + public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = + false; + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY = + "mapreduce.jobclient.retry.policy.spec"; + public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT = + "10000,6,60000,10"; //t1,n1,t2,n2,... + /** * Create a job client. */ @@ -471,16 +487,61 @@ public class JobClient extends Configure conf.setNumMapTasks(1); this.jobSubmitClient = new LocalJobRunner(conf); } else { - this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf); + this.rpcJobSubmitClient = + createRPCProxy(JobTracker.getAddress(conf), conf); + this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf); } } private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr, Configuration conf) throws IOException { - return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class, - JobSubmissionProtocol.versionID, addr, - UserGroupInformation.getCurrentUser(), conf, - NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class)); + + JobSubmissionProtocol rpcJobSubmitClient = + (JobSubmissionProtocol)RPC.getProxy( + JobSubmissionProtocol.class, + JobSubmissionProtocol.versionID, addr, + UserGroupInformation.getCurrentUser(), conf, + NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class), + 0, + RetryUtils.getMultipleLinearRandomRetry( + conf, + MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, + MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, + MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY, + MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT + ) + ); + + return rpcJobSubmitClient; + } + + private static JobSubmissionProtocol createProxy( + JobSubmissionProtocol rpcJobSubmitClient, + Configuration conf) throws IOException { + + RetryPolicy defaultPolicy = + RetryUtils.getDefaultRetryPolicy( + conf, + MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY, + MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, + MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY, + MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT, + SafeModeException.class + ); + + /* + * Method specific retry policies for killJob and killTask... + * + * No retries on any exception including + * ConnectionException and SafeModeException + */ + Map methodNameToPolicyMap = + new HashMap(); + methodNameToPolicyMap.put("killJob", RetryPolicies.TRY_ONCE_THEN_FAIL); + methodNameToPolicyMap.put("killTask", RetryPolicies.TRY_ONCE_THEN_FAIL); + + return (JobSubmissionProtocol) RetryProxy.create(JobSubmissionProtocol.class, + rpcJobSubmitClient, defaultPolicy, methodNameToPolicyMap); } @InterfaceAudience.Private @@ -496,7 +557,7 @@ public class JobClient extends Configure public long renew(Token token, Configuration conf ) throws IOException, InterruptedException { InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); - JobSubmissionProtocol jt = createRPCProxy(addr, conf); + JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf); return jt.renewDelegationToken((Token) token); } @@ -505,7 +566,7 @@ public class JobClient extends Configure public void cancel(Token token, Configuration conf ) throws IOException, InterruptedException { InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token); - JobSubmissionProtocol jt = createRPCProxy(addr, conf); + JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf); jt.cancelDelegationToken((Token) token); } @@ -531,15 +592,16 @@ public class JobClient extends Configure public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException { this.ugi = UserGroupInformation.getCurrentUser(); - jobSubmitClient = createRPCProxy(jobTrackAddr, conf); + rpcJobSubmitClient = createRPCProxy(jobTrackAddr, conf); + jobSubmitClient = createProxy(rpcJobSubmitClient, conf); } /** * Close the JobClient. */ public synchronized void close() throws IOException { - if (!(jobSubmitClient instanceof LocalJobRunner)) { - RPC.stopProxy(jobSubmitClient); + if (!(rpcJobSubmitClient instanceof LocalJobRunner)) { + RPC.stopProxy(rpcJobSubmitClient); } } Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1390042&r1=1390041&r2=1390042&view=diff ============================================================================== --- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Sep 25 18:26:28 2012 @@ -3695,9 +3695,8 @@ public class JobTracker implements MRCon public JobStatus submitJob(JobID jobId, String jobSubmitDir, UserGroupInformation ugi, Credentials ts, boolean recovered) throws IOException { - if (isInSafeMode()) { - throw new IOException("JobTracker in safemode"); - } + // Check for safe-mode + checkSafeMode(); JobInfo jobInfo = null; if (ugi == null) { @@ -3780,6 +3779,9 @@ public class JobTracker implements MRCon * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir() */ public String getStagingAreaDir() throws IOException { + // Check for safe-mode + checkSafeMode(); + try{ final String user = UserGroupInformation.getCurrentUser().getShortUserName(); @@ -3896,6 +3898,9 @@ public class JobTracker implements MRCon return; } + // No 'killJob' in safe-mode + checkSafeMode(); + JobInProgress job = jobs.get(jobid); if (null == job) { @@ -4351,6 +4356,9 @@ public class JobTracker implements MRCon */ public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail) throws IOException { + // No 'killTask' in safe-mode + checkSafeMode(); + TaskInProgress tip = taskidToTIPMap.get(taskid); if(tip != null) { // check both queue-level and job-level access @@ -5242,4 +5250,15 @@ public class JobTracker implements MRCon return "ON - " + safeModeInfo + ""; } + private void checkSafeMode() throws SafeModeException { + if (isInSafeMode()) { + try { + throw new SafeModeException(( + isInAdminSafeMode()) ? adminSafeModeUser : null); + } catch (SafeModeException sfe) { + LOG.info("JobTracker in safe-mode, aborting operation", sfe); + throw sfe; + } + } + } }