hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r386224 - in /lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred: JobConf.java JobInProgress.java TaskInProgress.java
Date Thu, 16 Mar 2006 00:01:43 GMT
Author: cutting
Date: Wed Mar 15 16:01:42 2006
New Revision: 386224

URL: http://svn.apache.org/viewcvs?rev=386224&view=rev
Log:
Fix for HADOOP-81.  Job-specific parameters should be read from the job-specific configuration,
not the daemon's.  This permits speculative execution, number of map & reduce tasks, etc.
to be settable in the job.  Contributed by Owen O'Malley.

Modified:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?rev=386224&r1=386223&r2=386224&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Wed Mar 15 16:01:42
2006
@@ -244,6 +244,22 @@
     setClass("mapred.combiner.class", theClass, Reducer.class);
   }
   
+  /**
+   * Should speculative execution be used for this job?
+   * @return Defaults to true
+   */
+  public boolean getSpeculativeExecution() { 
+    return getBoolean("mapred.speculative.execution", true);
+  }
+  
+  /**
+   * Turn on or off speculative execution for this job.
+   * In general, it should be turned off for map jobs that have side effects.
+   */
+  public void setSpeculativeExecution(boolean new_val) {
+    setBoolean("mapred.speculative.execution", new_val);
+  }
+  
   public int getNumMapTasks() { return getInt("mapred.map.tasks", 1); }
   public void setNumMapTasks(int n) { setInt("mapred.map.tasks", n); }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=386224&r1=386223&r2=386224&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Mar 15 16:01:42
2006
@@ -50,43 +50,46 @@
     long finishTime;
     String deleteUponCompletion = null;
 
-    Configuration conf;
+    private JobConf conf;
     boolean tasksInited = false;
 
     /**
      * Create a JobInProgress with the given job file, plus a handle
      * to the tracker.
      */
-    public JobInProgress(String jobFile, JobTracker jobtracker, Configuration conf) throws
IOException {
+    public JobInProgress(String jobFile, JobTracker jobtracker, 
+                         Configuration default_conf) throws IOException {
         String jobid = "job_" + jobtracker.createUniqueId();
         String url = "http://" + jobtracker.getJobTrackerMachine() + ":" + jobtracker.getInfoPort()
+ "/jobdetails.jsp?jobid=" + jobid;
-        this.conf = conf;
         this.jobtracker = jobtracker;
         this.profile = new JobProfile(jobid, jobFile, url);
         this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
         this.startTime = System.currentTimeMillis();
 
-        this.localJobFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".xml");
-        this.localJarFile = new JobConf(conf).getLocalFile(JobTracker.SUBDIR, jobid + ".jar");
-        FileSystem fs = FileSystem.get(conf);
+        JobConf default_job_conf = new JobConf(default_conf);
+        this.localJobFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, 
+            jobid + ".xml");
+        this.localJarFile = default_job_conf.getLocalFile(JobTracker.SUBDIR, 
+            jobid + ".jar");
+        FileSystem fs = FileSystem.get(default_conf);
         fs.copyToLocalFile(new File(jobFile), localJobFile);
 
-        JobConf jd = new JobConf(localJobFile);
+        conf = new JobConf(localJobFile);
 
-        String jarFile = jd.getJar();
+        String jarFile = conf.getJar();
         if (jarFile != null) {
           fs.copyToLocalFile(new File(jarFile), localJarFile);
-          jd.setJar(localJarFile.getCanonicalPath());
+          conf.setJar(localJarFile.getCanonicalPath());
         }
 
-        this.numMapTasks = jd.getNumMapTasks();
-        this.numReduceTasks = jd.getNumReduceTasks();
+        this.numMapTasks = conf.getNumMapTasks();
+        this.numReduceTasks = conf.getNumReduceTasks();
 
         //
         // If a jobFile is in the systemDir, we can delete it (and
         // its JAR) upon completion
         //
-        File systemDir = jd.getSystemDir();
+        File systemDir = conf.getSystemDir();
         if (jobFile.startsWith(systemDir.getPath())) {
             this.deleteUponCompletion = jobFile;
         }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=386224&r1=386223&r2=386224&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Mar 15 16:01:42
2006
@@ -47,37 +47,37 @@
     public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.mapred.TaskInProgress");
 
     // Defines the TIP
-    String jobFile = null;
-    FileSplit split = null;
-    String hints[][] = null;
-    TaskInProgress predecessors[] = null;
-    int partition;
-    JobTracker jobtracker;
-    String id;
-    String totalTaskIds[];
-    JobInProgress job;
+    private String jobFile = null;
+    private FileSplit split = null;
+    private String hints[][] = null;
+    private TaskInProgress predecessors[] = null;
+    private int partition;
+    private JobTracker jobtracker;
+    private String id;
+    private String totalTaskIds[];
+    private JobInProgress job;
 
     // Status of the TIP
-    int numTaskFailures = 0;
-    double progress = 0;
-    String state = "";
-    long startTime = 0;
-    int completes = 0;
-    boolean failed = false;
-    TreeSet usableTaskIds = new TreeSet();
-    TreeSet recentTasks = new TreeSet();
-    Configuration conf;
+    private int numTaskFailures = 0;
+    private double progress = 0;
+    private String state = "";
+    private long startTime = 0;
+    private int completes = 0;
+    private boolean failed = false;
+    private TreeSet usableTaskIds = new TreeSet();
+    private TreeSet recentTasks = new TreeSet();
+    private JobConf conf;
     
-    TreeMap taskDiagnosticData = new TreeMap();
-    TreeMap taskStatuses = new TreeMap();
+    private TreeMap taskDiagnosticData = new TreeMap();
+    private TreeMap taskStatuses = new TreeMap();
 
-    TreeSet machinesWhereFailed = new TreeSet();
-    TreeSet tasksReportedClosed = new TreeSet();
+    private TreeSet machinesWhereFailed = new TreeSet();
+    private TreeSet tasksReportedClosed = new TreeSet();
 
     /**
      * Constructor for MapTask
      */
-    public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, Configuration
conf, JobInProgress job) {
+    public TaskInProgress(String jobFile, FileSplit split, JobTracker jobtracker, JobConf
conf, JobInProgress job) {
         this.jobFile = jobFile;
         this.split = split;
         this.jobtracker = jobtracker;
@@ -89,7 +89,7 @@
     /**
      * Constructor for ReduceTask
      */
-    public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker
jobtracker, Configuration conf, JobInProgress job) {
+    public TaskInProgress(String jobFile, TaskInProgress predecessors[], int partition, JobTracker
jobtracker, JobConf conf, JobInProgress job) {
         this.jobFile = jobFile;
         this.predecessors = predecessors;
         this.partition = partition;
@@ -408,7 +408,7 @@
         //
         if (isMapTask() &&
             recentTasks.size() <= MAX_TASK_EXECS &&
-            conf.getBoolean("mapred.speculative.execution", true) &&
+            conf.getSpeculativeExecution() &&
             (averageProgress - progress >= SPECULATIVE_GAP) &&
             (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)) {
             return true;



Mime
View raw message