hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1390042 - in /hadoop/common/branches/branch-1.1: ./ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
Date Tue, 25 Sep 2012 18:26:28 GMT
Author: acmurthy
Date: Tue Sep 25 18:26:28 2012
New Revision: 1390042

URL: http://svn.apache.org/viewvc?rev=1390042&view=rev
Log:
Merge -c 1390041 from branch-1 to branch-1.1 to fix MAPREDUCE-4603. Add support for JobClient
to retry job-submission when JobTracker is in SafeMode. Contributed by Arun C. Murthy.

Added:
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java
      - copied unchanged from r1390041, hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/SafeModeException.java
    hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java
      - copied unchanged from r1390041, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobClientRetries.java
Modified:
    hadoop/common/branches/branch-1.1/CHANGES.txt
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1390042&r1=1390041&r2=1390042&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 18:26:28 2012
@@ -187,14 +187,15 @@ Release 1.1.0 - 2012.09.16
     MAPREDUCE-782. Use PureJavaCrc32 in mapreduce spills. 
     (Todd Lipcon, backport by Brandon Li via sseth)
 
-    HDFS-3667.  Add retry support to WebHdfsFileSystem.  (szetszwo)
-
     HADOOP-8748. Refactor DFSClient retry utility methods to a new class in
     org.apache.hadoop.io.retry.  Contributed by Arun C Murthy.
 
     HDFS-3871. Change DFSClient to use RetryUtils.  (Arun C Murthy
     via szetszwo)
 
+    MAPREDUCE-4603. Add support for JobClient to retry job-submission when
+    JobTracker is in SafeMode. (acmurthy)
+
   BUG FIXES
 
     HDFS-3696. Set chunked streaming mode in WebHdfsFileSystem write operations

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java?rev=1390042&r1=1390041&r2=1390042&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobClient.java Tue
Sep 25 18:26:28 2012
@@ -40,6 +40,7 @@ import java.security.PrivilegedException
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
@@ -59,6 +60,10 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.Counters.Counter;
@@ -428,7 +433,9 @@ public class JobClient extends Configure
     }
   }
 
+  private JobSubmissionProtocol rpcJobSubmitClient;
   private JobSubmissionProtocol jobSubmitClient;
+  
   private Path sysDir = null;
   private Path stagingAreaDir = null;
   
@@ -439,6 +446,15 @@ public class JobClient extends Configure
   private static final int DEFAULT_TASKLOG_TIMEOUT = 60000;
   static int tasklogtimeout;
 
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY =
+      "mapreduce.jobclient.retry.policy.enabled";
+  public static final boolean MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT = 
+      false;
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY =
+      "mapreduce.jobclient.retry.policy.spec";
+  public static final String MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT =
+      "10000,6,60000,10"; //t1,n1,t2,n2,...
+  
   /**
    * Create a job client.
    */
@@ -471,16 +487,61 @@ public class JobClient extends Configure
       conf.setNumMapTasks(1);
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.jobSubmitClient = createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.rpcJobSubmitClient = 
+          createRPCProxy(JobTracker.getAddress(conf), conf);
+      this.jobSubmitClient = createProxy(this.rpcJobSubmitClient, conf);
     }        
   }
 
   private static JobSubmissionProtocol createRPCProxy(InetSocketAddress addr,
       Configuration conf) throws IOException {
-    return (JobSubmissionProtocol) RPC.getProxy(JobSubmissionProtocol.class,
-        JobSubmissionProtocol.versionID, addr, 
-        UserGroupInformation.getCurrentUser(), conf,
-        NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class));
+    
+    JobSubmissionProtocol rpcJobSubmitClient = 
+        (JobSubmissionProtocol)RPC.getProxy(
+            JobSubmissionProtocol.class,
+            JobSubmissionProtocol.versionID, addr, 
+            UserGroupInformation.getCurrentUser(), conf,
+            NetUtils.getSocketFactory(conf, JobSubmissionProtocol.class), 
+            0,
+            RetryUtils.getMultipleLinearRandomRetry(
+                conf,
+                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+                MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+                MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT
+                )
+            );
+    
+    return rpcJobSubmitClient;
+  }
+
+  private static JobSubmissionProtocol createProxy(
+      JobSubmissionProtocol rpcJobSubmitClient,
+      Configuration conf) throws IOException {
+
+    RetryPolicy defaultPolicy = 
+        RetryUtils.getDefaultRetryPolicy(
+            conf,
+            MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_KEY,
+            MAPREDUCE_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+            MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_KEY,
+            MAPREDUCE_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+            SafeModeException.class
+            ); 
+    
+    /* 
+     * Method specific retry policies for killJob and killTask...
+     * 
+     * No retries on any exception including 
+     * ConnectionException and SafeModeException
+     */
+    Map<String,RetryPolicy> methodNameToPolicyMap = 
+        new HashMap<String,RetryPolicy>();
+    methodNameToPolicyMap.put("killJob", RetryPolicies.TRY_ONCE_THEN_FAIL);
+    methodNameToPolicyMap.put("killTask", RetryPolicies.TRY_ONCE_THEN_FAIL);
+    
+    return (JobSubmissionProtocol) RetryProxy.create(JobSubmissionProtocol.class,
+        rpcJobSubmitClient, defaultPolicy, methodNameToPolicyMap);
   }
 
   @InterfaceAudience.Private
@@ -496,7 +557,7 @@ public class JobClient extends Configure
     public long renew(Token<?> token, Configuration conf
                       ) throws IOException, InterruptedException {
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
-      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
       return jt.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
     }
 
@@ -505,7 +566,7 @@ public class JobClient extends Configure
     public void cancel(Token<?> token, Configuration conf
                        ) throws IOException, InterruptedException {
       InetSocketAddress addr = SecurityUtil.getTokenServiceAddr(token);
-      JobSubmissionProtocol jt = createRPCProxy(addr, conf);
+      JobSubmissionProtocol jt = createProxy(createRPCProxy(addr, conf), conf);
       jt.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
     }
 
@@ -531,15 +592,16 @@ public class JobClient extends Configure
   public JobClient(InetSocketAddress jobTrackAddr, 
                    Configuration conf) throws IOException {
     this.ugi = UserGroupInformation.getCurrentUser();
-    jobSubmitClient = createRPCProxy(jobTrackAddr, conf);
+    rpcJobSubmitClient = createRPCProxy(jobTrackAddr, conf); 
+    jobSubmitClient = createProxy(rpcJobSubmitClient, conf);
   }
 
   /**
    * Close the <code>JobClient</code>.
    */
   public synchronized void close() throws IOException {
-    if (!(jobSubmitClient instanceof LocalJobRunner)) {
-      RPC.stopProxy(jobSubmitClient);
+    if (!(rpcJobSubmitClient instanceof LocalJobRunner)) {
+      RPC.stopProxy(rpcJobSubmitClient);
     }
   }
 

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1390042&r1=1390041&r2=1390042&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Tue Sep 25 18:26:28 2012
@@ -3695,9 +3695,8 @@ public class JobTracker implements MRCon
   public JobStatus submitJob(JobID jobId, String jobSubmitDir,
       UserGroupInformation ugi, Credentials ts, boolean recovered)
       throws IOException {
-    if (isInSafeMode()) {
-      throw new IOException("JobTracker in safemode");
-    }
+    // Check for safe-mode
+    checkSafeMode();
     
     JobInfo jobInfo = null;
     if (ugi == null) {
@@ -3780,6 +3779,9 @@ public class JobTracker implements MRCon
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getStagingAreaDir()
    */
   public String getStagingAreaDir() throws IOException {
+    // Check for safe-mode
+    checkSafeMode();
+
     try{
       final String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
@@ -3896,6 +3898,9 @@ public class JobTracker implements MRCon
       return;
     }
     
+    // No 'killJob' in safe-mode
+    checkSafeMode();
+    
     JobInProgress job = jobs.get(jobid);
     
     if (null == job) {
@@ -4351,6 +4356,9 @@ public class JobTracker implements MRCon
    */
   public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail)
       throws IOException {
+    // No 'killTask' in safe-mode
+    checkSafeMode();
+
     TaskInProgress tip = taskidToTIPMap.get(taskid);
     if(tip != null) {
       // check both queue-level and job-level access
@@ -5242,4 +5250,15 @@ public class JobTracker implements MRCon
     return "<em>ON - " + safeModeInfo + "</em>";
   }
   
+  private void checkSafeMode() throws SafeModeException {
+    if (isInSafeMode()) {
+      try {
+        throw new SafeModeException((
+            isInAdminSafeMode()) ? adminSafeModeUser : null);
+      } catch (SafeModeException sfe) {
+        LOG.info("JobTracker in safe-mode, aborting operation", sfe);
+        throw sfe;
+      }
+    }
+  }
 }



Mime
View raw message