hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r556746 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/
Date Mon, 16 Jul 2007 21:59:38 GMT
Author: cutting
Date: Mon Jul 16 14:59:34 2007
New Revision: 556746

URL: http://svn.apache.org/viewvc?view=rev&rev=556746
Log:
HADOOP-1400.  Make JobClient retry requests, so that clients survive jobtracker problems.
 Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=556746&r1=556745&r2=556746
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jul 16 14:59:34 2007
@@ -367,6 +367,9 @@
      This reduces the namenode's memory requirements and increases
      data integrity.  (Raghu Angadi via cutting)
 
+115. HADOOP-1400.  Make JobClient retry requests, so that clients can
+     survive jobtracker problems.  (omalley via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?view=diff&rev=556746&r1=556745&r2=556746
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Mon Jul 16 14:59:34
2007
@@ -21,6 +21,7 @@
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.io.retry.*;
 import org.apache.hadoop.ipc.*;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
@@ -201,32 +202,52 @@
   public JobClient() {
   }
     
-  public JobClient(Configuration conf) throws IOException {
+  public JobClient(JobConf conf) throws IOException {
     setConf(conf);
-    init();
+    init(conf);
   }
     
-  public void init() throws IOException {
+  public void init(JobConf conf) throws IOException {
     String tracker = conf.get("mapred.job.tracker", "local");
     if ("local".equals(tracker)) {
       this.jobSubmitClient = new LocalJobRunner(conf);
     } else {
-      this.jobSubmitClient = (JobSubmissionProtocol) 
-        RPC.getProxy(JobSubmissionProtocol.class,
-                     JobSubmissionProtocol.versionID,
-                     JobTracker.getAddress(conf), conf);
+      this.jobSubmitClient = createProxy(JobTracker.getAddress(conf), conf);
     }        
   }
-  
+
   /**
-   * Build a job client, connect to the indicated job tracker.
+   * Create a proxy JobSubmissionProtocol that retries timeouts.
+   * @param addr the address to connect to
+   * @param conf the server's configuration
+   * @return a proxy object that will retry timeouts
+   * @throws IOException
    */
-  public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException
{
-    this.jobSubmitClient = (JobSubmissionProtocol) 
+  private JobSubmissionProtocol createProxy(InetSocketAddress addr,
+                                            Configuration conf
+                                            ) throws IOException {
+    JobSubmissionProtocol raw = (JobSubmissionProtocol) 
       RPC.getProxy(JobSubmissionProtocol.class,
-                   JobSubmissionProtocol.versionID, jobTrackAddr, conf);
+                   JobSubmissionProtocol.versionID, addr, conf);
+    RetryPolicy backoffPolicy =
+      RetryPolicies.retryUpToMaximumCountWithProportionalSleep
+      (5, 10, java.util.concurrent.TimeUnit.SECONDS);
+    Map<Class<? extends Exception>, RetryPolicy> handlers = 
+      new HashMap<Class<? extends Exception>, RetryPolicy>();
+    handlers.put(SocketTimeoutException.class, backoffPolicy);
+    RetryPolicy backoffTimeOuts = 
+      RetryPolicies.retryByException(RetryPolicies.TRY_ONCE_THEN_FAIL,handlers);
+    return (JobSubmissionProtocol)
+      RetryProxy.create(JobSubmissionProtocol.class, raw, backoffTimeOuts);
   }
 
+  /**
+   * Build a job client, connect to the indicated job tracker.
+   */
+  public JobClient(InetSocketAddress jobTrackAddr, 
+                   Configuration conf) throws IOException {
+    jobSubmitClient = createProxy(jobTrackAddr, conf);
+  }
 
   /**
    */
@@ -270,15 +291,15 @@
     //
 
     // Create a number of filenames in the JobTracker's fs namespace
-    Path submitJobDir = new Path(job.getSystemDir(), "submit_" + 
-                                 Integer.toString(r.nextInt(Integer.MAX_VALUE), 
-                                                  36));
+    String jobId = jobSubmitClient.getNewJobId();
+    Path submitJobDir = new Path(job.getSystemDir(), jobId);
+    FileSystem fs = getFs();
+    LOG.debug("default FileSystem: " + fs.getUri());
+    fs.delete(submitJobDir);    
     Path submitJobFile = new Path(submitJobDir, "job.xml");
     Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitSplitFile = new Path(submitJobDir, "job.split");
         
-    FileSystem fs = getFs();
-    LOG.debug("default FileSystem: " + fs.getUri());
     // try getting the md5 of the archives
     URI[] tarchives = DistributedCache.getCacheArchives(job);
     URI[] tfiles = DistributedCache.getCacheFiles(job);
@@ -379,7 +400,7 @@
     //
     // Now, actually submit the job (using the submit name)
     //
-    JobStatus status = jobSubmitClient.submitJob(submitJobFile.toString());
+    JobStatus status = jobSubmitClient.submitJob(jobId);
     if (status != null) {
       return new NetworkedJob(status);
     } else {
@@ -723,9 +744,6 @@
       throw new RuntimeException("JobClient:" + cmd);
     }
 
-    // initialize JobClient
-    init();
-        
     // Process args
     String submitJobFile = null;
     String jobid = null;
@@ -751,11 +769,20 @@
       }
     }
 
+    // initialize JobClient
+    JobConf conf = null;
+    if (submitJobFile != null) {
+      conf = new JobConf(submitJobFile);
+    } else {
+      conf = new JobConf();
+    }
+    init(conf);
+        
     // Submit the request
     int exitCode = -1;
     try {
       if (submitJobFile != null) {
-        RunningJob job = submitJob(submitJobFile);
+        RunningJob job = submitJob(conf);
         System.out.println("Created job " + job.getJobID());
       } else if (getStatus) {
         RunningJob job = getJob(jobid);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=556746&r1=556745&r2=556746
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jul 16 14:59:34
2007
@@ -29,7 +29,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
@@ -115,28 +114,28 @@
    * Create a JobInProgress with the given job file, plus a handle
    * to the tracker.
    */
-  public JobInProgress(String jobFile, JobTracker jobtracker, 
-                       Configuration default_conf) throws IOException {
-    jobId = jobtracker.getTrackerIdentifier() + "_" +jobtracker.createJobId();
-    String fullJobId = "job_" + jobId;
+  public JobInProgress(String jobid, JobTracker jobtracker, 
+                       JobConf default_conf) throws IOException {
+    this.jobId = jobid;
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
-        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + fullJobId;
+        + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + jobid;
     this.jobtracker = jobtracker;
-    this.status = new JobStatus(fullJobId, 0.0f, 0.0f, JobStatus.PREP);
+    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
     this.startTime = System.currentTimeMillis();
     this.localFs = (LocalFileSystem)FileSystem.getLocal(default_conf);
 
     JobConf default_job_conf = new JobConf(default_conf);
     this.localJobFile = default_job_conf.getLocalPath(JobTracker.SUBDIR 
-                                                      +"/"+fullJobId + ".xml");
+                                                      +"/"+jobid + ".xml");
     this.localJarFile = default_job_conf.getLocalPath(JobTracker.SUBDIR
-                                                      +"/"+ fullJobId + ".jar");
+                                                      +"/"+ jobid + ".jar");
     FileSystem fs = FileSystem.get(default_conf);
-    fs.copyToLocalFile(new Path(jobFile), localJobFile);
+    Path jobFile = new Path(default_conf.getSystemDir(), jobid + "/job.xml");
+    fs.copyToLocalFile(jobFile, localJobFile);
     conf = new JobConf(localJobFile);
     this.priority = conf.getJobPriority();
-    this.profile = new JobProfile(conf.getUser(), fullJobId, jobFile, url,
-                                  conf.getJobName());
+    this.profile = new JobProfile(conf.getUser(), jobid, 
+                                  jobFile.toString(), url, jobid);
     String jarFile = conf.getJar();
     if (jarFile != null) {
       fs.copyToLocalFile(new Path(jarFile), localJarFile);
@@ -151,15 +150,16 @@
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
         
-    JobHistory.JobInfo.logSubmitted(fullJobId, conf.getJobName(), conf.getUser(), 
-                                    System.currentTimeMillis(), jobFile); 
+    JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), 
+                                    System.currentTimeMillis(), 
+                                    jobFile.toString()); 
         
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
-    this.jobMetrics.setTag("jobId", fullJobId);
+    this.jobMetrics.setTag("jobId", jobid);
   }
 
   /**
@@ -1076,8 +1076,8 @@
         
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
-      String tempDir = conf.get("mapred.system.dir") + "/job_" + jobId; 
-      fs.delete(new Path(tempDir)); 
+      Path tempDir = new Path(conf.getSystemDir(), jobId); 
+      fs.delete(tempDir); 
 
     } catch (IOException e) {
       LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?view=diff&rev=556746&r1=556745&r2=556746
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Mon Jul
16 14:59:34 2007
@@ -33,11 +33,20 @@
    *changed
    */
   public static final long versionID = 3L;
+
+  /**
+   * Allocate a name for the job.
+   * @return a unique job name for submitting jobs.
+   * @throws IOException
+   */
+  public String getNewJobId() throws IOException;
+
   /**
    * Submit a Job for execution.  Returns the latest profile for
    * that job.
+   * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
    */
-  public JobStatus submitJob(String jobFile) throws IOException;
+  public JobStatus submitJob(String jobName) throws IOException;
 
   /**
    * Get the current status of the cluster

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=556746&r1=556745&r2=556746
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jul 16 14:59:34
2007
@@ -99,7 +99,7 @@
    * @param conf configuration for the JobTracker.
    * @throws IOException
    */
-  public static void startTracker(Configuration conf) throws IOException {
+  public static void startTracker(JobConf conf) throws IOException {
     if (tracker != null)
       throw new IOException("JobTracker already running.");
     runTracker = true;
@@ -604,12 +604,12 @@
   static final String SUBDIR = "jobTracker";
   FileSystem fs;
   Path systemDir;
-  private Configuration conf;
+  private JobConf conf;
 
   /**
    * Start the JobTracker process, listen on the indicated port
    */
-  JobTracker(Configuration conf) throws IOException {
+  JobTracker(JobConf conf) throws IOException {
     //
     // Grab some static constants
     //
@@ -1441,9 +1441,27 @@
     LOG.warn("Report from " + taskTracker + ": " + errorMessage);        
   }
 
+  /**
+   * Remove the job_ from jobids to get the unique string.
+   */
+  static String getJobUniqueString(String jobid) {
+    return jobid.substring(4);
+  }
+
   ////////////////////////////////////////////////////
   // JobSubmissionProtocol
   ////////////////////////////////////////////////////
+
+  /**
+   * Allocates a new JobId string.
+   */
+  public String getNewJobId() {
+    synchronized (this) {
+      return "job_" + getTrackerIdentifier() + "_" + 
+             idFormat.format(nextJobId++);
+    }
+  }
+
   /**
    * JobTracker.submitJob() kicks off a new job.  
    *
@@ -1677,12 +1695,6 @@
   public JobInProgress getJob(String jobid) {
     return jobs.get(jobid);
   }
-  /**
-   * Grab random num for job id
-   */
-  String createJobId() {
-    return idFormat.format(nextJobId++);
-  }
 
   ////////////////////////////////////////////////////
   // Methods to track all the TaskTrackers
@@ -1773,8 +1785,7 @@
     }
       
     try {
-      Configuration conf=new Configuration();
-      startTracker(conf);
+      startTracker(new JobConf());
     } catch (Throwable e) {
       LOG.fatal(StringUtils.stringifyException(e));
       System.exit(-1);

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=556746&r1=556745&r2=556746
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Mon Jul 16 14:59:34
2007
@@ -26,7 +26,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.*;
@@ -39,7 +38,7 @@
 
   private FileSystem fs;
   private HashMap<String, Job> jobs = new HashMap<String, Job>();
-  private Configuration conf;
+  private JobConf conf;
   private int map_tasks = 0;
   private int reduce_tasks = 0;
 
@@ -51,7 +50,7 @@
   
   private class Job extends Thread
     implements TaskUmbilicalProtocol {
-    private String file;
+    private Path file;
     private String id;
     private JobConf job;
     private Random random = new Random();
@@ -74,18 +73,18 @@
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public Job(String file, Configuration conf) throws IOException {
-      this.file = file;
-      this.id = "job_" + newId();
+    public Job(String jobid, JobConf conf) throws IOException {
+      this.file = new Path(conf.getSystemDir(), jobid + "/job.xml");
+      this.id = jobid;
       this.mapoutputFile = new MapOutputFile();
       this.mapoutputFile.setConf(conf);
 
       this.localFile = new JobConf(conf).getLocalPath("localRunner/"+id+".xml");
       this.localFs = FileSystem.getLocal(conf);
 
-      fs.copyToLocalFile(new Path(file), localFile);
+      fs.copyToLocalFile(file, localFile);
       this.job = new JobConf(localFile);
-      profile = new JobProfile(job.getUser(), id, file, 
+      profile = new JobProfile(job.getUser(), id, file.toString(), 
                                "http://localhost:8080/", job.getJobName());
       status = new JobStatus(id, 0.0f, 0.0f, JobStatus.RUNNING);
 
@@ -119,7 +118,7 @@
           splits[i].write(buffer);
           BytesWritable split = new BytesWritable();
           split.set(buffer.getData(), 0, buffer.getLength());
-          MapTask map = new MapTask(jobId, file, "tip_m_" + mapId, 
+          MapTask map = new MapTask(jobId, file.toString(), "tip_m_" + mapId, 
                                     mapId, i,
                                     splits[i].getClass().getName(),
                                     split);
@@ -152,8 +151,9 @@
             }
 
             {
-              ReduceTask reduce = new ReduceTask(jobId, file, "tip_r_0001",
-                  reduceId, 0, mapIds.size());
+              ReduceTask reduce = new ReduceTask(jobId, file.toString(), 
+                                                 "tip_r_0001",
+                                                 reduceId, 0, mapIds.size());
               JobConf localConf = new JobConf(job);
               reduce.localizeConfiguration(localConf);
               reduce.setConf(localConf);
@@ -187,7 +187,7 @@
 
       } finally {
         try {
-          fs.delete(new Path(file).getParent());  // delete submit dir
+          fs.delete(file.getParent());  // delete submit dir
           localFs.delete(localFile);              // delete local copy
         } catch (IOException e) {
           LOG.warn("Error cleaning up "+id+": "+e);
@@ -258,7 +258,7 @@
     
   }
 
-  public LocalJobRunner(Configuration conf) throws IOException {
+  public LocalJobRunner(JobConf conf) throws IOException {
     this.fs = FileSystem.get(conf);
     this.conf = conf;
     myMetrics = new JobTrackerMetrics(new JobConf(conf));
@@ -266,8 +266,13 @@
 
   // JobSubmissionProtocol methods
 
-  public JobStatus submitJob(String jobFile) throws IOException {
-    return new Job(jobFile, this.conf).status;
+  private int jobid = 0;
+  public String getNewJobId() {
+    return "job_local_" + Integer.toString(++jobid);
+  }
+
+  public JobStatus submitJob(String jobid) throws IOException {
+    return new Job(jobid, this.conf).status;
   }
 
   public void killJob(String id) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=556746&r1=556745&r2=556746
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Jul 16 14:59:34
2007
@@ -114,7 +114,7 @@
   /**
    * Constructor for MapTask
    */
-  public TaskInProgress(String uniqueString, String jobFile, 
+  public TaskInProgress(String jobid, String jobFile, 
                         String splitClass, BytesWritable split, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition) {
@@ -126,13 +126,13 @@
     this.conf = conf;
     this.partition = partition;
     setMaxTaskAttempts();
-    init(uniqueString);
+    init(JobTracker.getJobUniqueString(jobid));
   }
         
   /**
    * Constructor for ReduceTask
    */
-  public TaskInProgress(String uniqueString, String jobFile, 
+  public TaskInProgress(String jobid, String jobFile, 
                         int numMaps, 
                         int partition, JobTracker jobtracker, JobConf conf,
                         JobInProgress job) {
@@ -143,7 +143,7 @@
     this.job = job;
     this.conf = conf;
     setMaxTaskAttempts();
-    init(uniqueString);
+    init(JobTracker.getJobUniqueString(jobid));
   }
   /**
    * Set the max number of attempts before we declare a TIP as "failed"



Mime
View raw message