hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r788619 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/ sr...
Date Fri, 26 Jun 2009 07:52:41 GMT
Author: sharad
Date: Fri Jun 26 07:52:40 2009
New Revision: 788619

URL: http://svn.apache.org/viewvc?rev=788619&view=rev
Log:
MAPREDUCE-463. Makes job setup and cleanup tasks as optional. Contributed by Amareshwari Sriramadasu.

Added:
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/java/mapred-default.xml
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Jun 26 07:52:40 2009
@@ -27,6 +27,9 @@
     HADOOP-5968. Sqoop should only print a warning about mysql import speed
     once. (Aaron Kimball via tomwhite)
 
+    MAPREDUCE-463. Makes job setup and cleanup tasks as optional.
+    (Amareshwari Sriramadasu via sharad)
+
   BUG FIXES
     HADOOP-4687. MapReduce is split from Hadoop Core. It is a subproject under 
     Hadoop (Owen O'Malley)

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Jun 26 07:52:40 2009
@@ -959,7 +959,7 @@
     //Start thread for initialization
     if (initializationPoller == null) {
       this.initializationPoller = new JobInitializationPoller(
-          jobQueuesManager,schedConf,queues);
+          jobQueuesManager,schedConf,queues, taskTrackerManager);
     }
     initializationPoller.init(queueManager.getQueues(), schedConf);
     initializationPoller.setDaemon(true);

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
Fri Jun 26 07:52:40 2009
@@ -137,19 +137,12 @@
           LOG.info("Initializing job : " + job.getJobID() + " in Queue "
               + job.getProfile().getQueueName() + " For user : "
               + job.getProfile().getUser());
-          try {
-            if (startIniting) {
-              setInitializingJob(job);
-              job.initTasks();
-              setInitializingJob(null);
-            } else {
-              break;
-            }
-          } catch (Throwable t) {
-            LOG.info("Job initialization failed:\n"
-                + StringUtils.stringifyException(t));
-            jobQueueManager.removeJobFromWaitingQueue(job);
-            job.fail(); 
+          if (startIniting) {
+            setInitializingJob(job);
+            ttm.initJob(job);
+            setInitializingJob(null);
+          } else {
+            break;
           }
         }
       }
@@ -246,6 +239,7 @@
 
   private volatile boolean running;
 
+  private TaskTrackerManager ttm;
   /**
    * The map which provides information which thread should be used to
    * initialize jobs for a given job queue.
@@ -253,13 +247,15 @@
   private HashMap<String, JobInitializationThread> threadsToQueueMap;
 
   public JobInitializationPoller(JobQueuesManager mgr,
-      CapacitySchedulerConf rmConf, Set<String> queue) {
+      CapacitySchedulerConf rmConf, Set<String> queue, 
+      TaskTrackerManager ttm) {
     initializedJobs = new HashMap<JobID,JobInProgress>();
     jobQueues = new HashMap<String, QueueInfo>();
     this.jobQueueManager = mgr;
     threadsToQueueMap = new HashMap<String, JobInitializationThread>();
     super.setName("JobInitializationPollerThread");
     running = true;
+    this.ttm = ttm;
   }
 
   /*

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Fri Jun 26 07:52:40 2009
@@ -99,8 +99,9 @@
     
     public ControlledInitializationPoller(JobQueuesManager mgr,
                                           CapacitySchedulerConf rmConf,
-                                          Set<String> queues) {
-      super(mgr, rmConf, queues);
+                                          Set<String> queues,
+                                          TaskTrackerManager ttm) {
+      super(mgr, rmConf, queues, ttm);
     }
     
     @Override
@@ -503,6 +504,22 @@
       job.kill();
     }
 
+    public void initJob(JobInProgress jip) {
+      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+      try {
+        jip.initTasks();
+      } catch (IOException ioe) {
+        jip.fail();
+      }
+      JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+      JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
+        EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
+       for (JobInProgressListener listener : listeners) {
+         listener.jobUpdated(event);
+       }
+    }
+    
+
     public void removeJob(JobID jobid) {
       jobs.remove(jobid);
     }
@@ -749,7 +766,7 @@
     controlledInitializationPoller = new ControlledInitializationPoller(
         scheduler.jobQueuesManager,
         resConf,
-        resConf.getQueues());
+        resConf.getQueues(), taskTrackerManager);
     scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setConf(conf);
     //by default disable speculative execution.
@@ -777,7 +794,7 @@
   private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
       throws IOException {
     FakeJobInProgress j = submitJob(state, jobConf);
-    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    taskTrackerManager.initJob(j);
     return j;
   }
 
@@ -797,21 +814,10 @@
                                              String queue, String user) 
   throws IOException {
     FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
-    scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
+    taskTrackerManager.initJob(j);
     return j;
   }
   
-  // Note that there is no concept of setup tasks here. So init itself should 
-  // report the job-status change
-  private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip) 
-  throws IOException {
-    JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-    jip.initTasks();
-    JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-    return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED, 
-                                    oldStatus, newStatus);
-  }
-  
   // test job run-state change
   public void testJobRunStateChange() throws IOException {
     // start the scheduler
@@ -838,16 +844,10 @@
     // first (may be because of the setup tasks).
     
     // init the lower ranked job first
-    JobChangeEvent event = initTasksAndReportEvent(fjob2);
-    
-    // inform the scheduler
-    scheduler.jobQueuesManager.jobUpdated(event);
+    taskTrackerManager.initJob(fjob2);
     
     // init the higher ordered job later
-    event = initTasksAndReportEvent(fjob1);
-    
-    // inform the scheduler
-    scheduler.jobQueuesManager.jobUpdated(event);
+    taskTrackerManager.initJob(fjob1);
     
     // check if the jobs are missing from the waiting queue
     // The jobs are not removed from waiting queue until they are scheduled 

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/PriorityScheduler.java
Fri Jun 26 07:52:40 2009
@@ -65,11 +65,7 @@
 
     @Override
     public void run() {
-      try {
-        job.initTasks();
-      } catch (IOException io) {
-        LOG.error(io);
-      }
+      taskTrackerManager.initJob(job);
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
Fri Jun 26 07:52:40 2009
@@ -98,6 +98,8 @@
     public JobInProgress getJob(JobID jobid) {
       return null;
     }
+    public void initJob(JobInProgress job) {
+    }
     public void startTask(String taskTrackerName, final Task t) {
     }
     void addQueues(String[] arr) {

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Fri Jun 26 07:52:40 2009
@@ -106,6 +106,7 @@
     try {
       Configuration conf = getConf();
       this.eagerInitListener = new EagerTaskInitializationListener(conf);
+      eagerInitListener.setTaskTrackerManager(taskTrackerManager);
       eagerInitListener.start();
       taskTrackerManager.addJobInProgressListener(eagerInitListener);
       taskTrackerManager.addJobInProgressListener(jobListener);

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Fri Jun 26 07:52:40 2009
@@ -194,6 +194,10 @@
       return null;
     }
 
+    public void initJob (JobInProgress job) {
+      // do nothing
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {

Modified: hadoop/mapreduce/trunk/src/java/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/mapred-default.xml?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/mapred-default.xml (original)
+++ hadoop/mapreduce/trunk/src/java/mapred-default.xml Fri Jun 26 07:52:40 2009
@@ -26,6 +26,13 @@
   </description>
 </property>
 
+<property>
+  <name>mapred.committer.job.setup.cleanup.needed</name>
+  <value>true</value>
+  <description> true, if job needs job-setup and job-cleanup.
+                false, otherwise  
+  </description>
+</property>
 <!-- i/o properties -->
 
 <property>

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
(original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/EagerTaskInitializationListener.java
Fri Jun 26 07:52:40 2009
@@ -29,7 +29,6 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
-import org.apache.hadoop.util.StringUtils;
 
 /**
  * A {@link JobInProgressListener} which initializes the tasks for a job as soon
@@ -67,7 +66,7 @@
     }
   }
   
-  static class InitJob implements Runnable {
+  class InitJob implements Runnable {
   
     private JobInProgress job;
     
@@ -76,16 +75,7 @@
     }
     
     public void run() {
-      try {
-        LOG.info("Initializing " + job.getJobID());
-        job.initTasks();
-      } catch (Throwable t) {
-        LOG.error("Job initialization failed:\n" +
-            StringUtils.stringifyException(t));
-        if (job != null) {
-          job.fail();
-        }
-      }
+      ttm.initJob(job);
     }
   }
   
@@ -94,12 +84,17 @@
   private List<JobInProgress> jobInitQueue = new ArrayList<JobInProgress>();
   private ExecutorService threadPool;
   private int numThreads;
+  private TaskTrackerManager ttm;
   
   public EagerTaskInitializationListener(Configuration conf) {
     numThreads = conf.getInt("mapred.jobinit.threads", DEFAULT_NUM_THREADS);
     threadPool = Executors.newFixedThreadPool(numThreads);
   }
   
+  public void setTaskTrackerManager(TaskTrackerManager ttm) {
+    this.ttm = ttm;
+  }
+  
   public void start() throws IOException {
     this.jobInitManagerThread = new Thread(jobInitManager, "jobInitManager");
     jobInitManagerThread.setDaemon(true);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Jun 26
07:52:40 2009
@@ -42,6 +42,7 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -110,6 +111,7 @@
   private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
+  private boolean jobSetupCleanupNeeded = true;
 
   JobPriority priority = JobPriority.NORMAL;
   protected JobTracker jobtracker;
@@ -350,6 +352,8 @@
     this.numReduceTasks = conf.getNumReduceTasks();
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
        (numMapTasks + numReduceTasks + 10);
+    JobContext jobContext = new JobContext(conf, jobId);
+    this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
@@ -601,7 +605,33 @@
           (conf.getFloat("mapred.reduce.slowstart.completed.maps", 
                          DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
            numMapTasks));
+    
+    initSetupCleanupTasks(jobFile);
+    
+    synchronized(jobInitKillStatus){
+      jobInitKillStatus.initDone = true;
+      if(jobInitKillStatus.killed) {
+        //setup not launched so directly terminate
+        terminateJob(JobStatus.KILLED);
+        return;
+      }
+    }
+    
+    tasksInited.set(true);
+    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
+                                 numMapTasks, numReduceTasks);
+    
+    // if setup is not needed, mark it complete
+    if (!jobSetupCleanupNeeded) {
+      setupComplete();
+    }
+  }
 
+  private void initSetupCleanupTasks(String jobFile) {
+    if (!jobSetupCleanupNeeded) {
+      // nothing to initialize
+      return;
+    }
     // create cleanup two cleanup tips, one map and one reduce.
     cleanup = new TaskInProgress[2];
 
@@ -630,19 +660,18 @@
     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks + 1, jobtracker, conf, this, 1);
     setup[1].setJobSetupTask();
-    
-    synchronized(jobInitKillStatus){
-      jobInitKillStatus.initDone = true;
-      if(jobInitKillStatus.killed) {
-        //setup not launched so directly terminate
-        terminateJob(JobStatus.KILLED);
-        return;
-      }
+  }
+  
+  private void setupComplete() {
+    status.setSetupProgress(1.0f);
+    if (maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded)
{
+      jobComplete();
+      return;
+    }
+    if (this.status.getRunState() == JobStatus.PREP) {
+      this.status.setRunState(JobStatus.RUNNING);
+      JobHistory.JobInfo.logStarted(profile.getJobID());
     }
-    
-    tasksInited.set(true);
-    JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
-                                 numMapTasks, numReduceTasks);
   }
 
   /////////////////////////////////////////////////////
@@ -2446,13 +2475,7 @@
     if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
-      // Job can start running now.
-      this.status.setSetupProgress(1.0f);
-      // move the job to running state if the job is in prep state
-      if (this.status.getRunState() == JobStatus.PREP) {
-        this.status.setRunState(JobStatus.RUNNING);
-        JobHistory.JobInfo.logStarted(profile.getJobID());
-      }
+      setupComplete();
     } else if (tip.isJobCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
@@ -2503,6 +2526,10 @@
       }
     }
     decrementSpeculativeCount(wasSpeculating, tip);
+    // is job complete?
+    if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) {
+      jobComplete();
+    }
     return true;
   }
   
@@ -2560,7 +2587,8 @@
     //
     // All tasks are complete, then the job is done!
     //
-    if (this.status.getRunState() == JobStatus.RUNNING ) {
+    if (this.status.getRunState() == JobStatus.RUNNING ||
+        this.status.getRunState() == JobStatus.PREP) {
       this.status.setRunState(JobStatus.SUCCEEDED);
       this.status.setCleanupProgress(1.0f);
       if (maps.length == 0) {
@@ -2657,6 +2685,9 @@
       
       // Clear out reserved tasktrackers
       cancelReservedSlots();
+      if (!jobSetupCleanupNeeded) {
+        terminateJob(jobTerminationState);
+      }
     }
   }
 
@@ -2918,7 +2949,9 @@
   }
 
   boolean isSetupFinished() {
-    if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
+    // if there is no setup to be launched, consider setup is finished.  
+    if ((tasksInited.get() && setup.length == 0) || 
+        setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
         || setup[1].isFailed()) {
       return true;
     }
@@ -3019,10 +3052,12 @@
    */
   public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
     if (tipid.getTaskType() == TaskType.MAP) {
-      if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
+      // cleanup map tip
+      if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) {
         return cleanup[0]; 
       }
-      if (tipid.equals(setup[0].getTIPId())) { //setup map tip
+      // setup map tip
+      if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) { 
         return setup[0];
       }
       for (int i = 0; i < maps.length; i++) {
@@ -3031,10 +3066,12 @@
         }
       }
     } else {
-      if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
+      // cleanup reduce tip
+      if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) { 
         return cleanup[1]; 
       }
-      if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
+      // setup reduce tip
+      if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) { 
         return setup[1];
       }
       for (int i = 0; i < reduces.length; i++) {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Fri
Jun 26 07:52:40 2009
@@ -48,7 +48,7 @@
   public synchronized void start() throws IOException {
     super.start();
     taskTrackerManager.addJobInProgressListener(jobQueueJobInProgressListener);
-    
+    eagerTaskInitializationListener.setTaskTrackerManager(taskTrackerManager);
     eagerTaskInitializationListener.start();
     taskTrackerManager.addJobInProgressListener(
         eagerTaskInitializationListener);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Jun 26 07:52:40
2009
@@ -855,14 +855,7 @@
         if (Values.PREP.name().equals(jobStatus)) {
           hasUpdates = true;
           LOG.info("Calling init from RM for job " + jip.getJobID().toString());
-          try {
-            jip.initTasks();
-          } catch (Throwable t) {
-            LOG.error("Job initialization failed : \n" 
-                      + StringUtils.stringifyException(t));
-            jip.fail(); // fail the job
-            throw new IOException(t);
-          }
+          initJob(jip);
         }
       }
       
@@ -3298,6 +3291,41 @@
     }
   }
 
+  public void initJob(JobInProgress job) {
+    if (null == job) {
+      LOG.info("Init on null job is not valid");
+      return;
+    }
+	        
+    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+    try {
+      LOG.info("Initializing " + job.getJobID());
+      job.initTasks();
+    } catch (Throwable t) {
+      LOG.error("Job initialization failed:\n" +
+          StringUtils.stringifyException(t));
+      if (job != null) {
+        job.fail();
+      }
+    }
+	    
+    // Inform the listeners if the job state has changed
+    // Note : 
+    //   If the job initialization is failed, job state will be FAILED
+    //   If job was killed during initialization, job state will be KILLED
+    //   If job does not require setup, job state will be RUNNING
+    //   otherwise, job state is PREP.
+    JobStatus newStatus = (JobStatus)job.getStatus().clone();
+    if (prevStatus.getRunState() != newStatus.getRunState()) {
+      JobStatusChangeEvent event = 
+        new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+            newStatus);
+      synchronized (JobTracker.this) {
+        updateJobInProgressListeners(event);
+      }
+    }
+  }
+
   /**
    * Set the priority of a job
    * @param jobid id of the job

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java Fri Jun
26 07:52:40 2009
@@ -88,4 +88,11 @@
    * @return jobInProgress object
    */
   public JobInProgress getJob(JobID jobid);
+  
+  /**
+   * Initialize the Job
+   * 
+   * @param job JobInProgress object
+   */
+  public void initJob(JobInProgress job);
 }

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/Job.java Fri Jun 26 07:52:40
2009
@@ -290,6 +290,18 @@
   }
 
   /**
+   * Specify whether job-setup and job-cleanup is needed for the job 
+   * 
+   * @param needed If <code>true</code>, job-setup and job-cleanup will be
+   *               considered from {@link OutputCommitter} 
+   *               else ignored.
+   */
+  public void setJobSetupCleanupNeeded(boolean needed) {
+    ensureState(JobState.DEFINE);
+    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", needed);
+  }
+
+  /**
    * Get the URL where some job progress information will be displayed.
    * 
    * @return the URL where some job progress information will be displayed.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapreduce/JobContext.java Fri Jun 26
07:52:40 2009
@@ -233,4 +233,15 @@
   public RawComparator<?> getGroupingComparator() {
     return conf.getOutputValueGroupingComparator();
   }
+  
+  /**
+   * Get whether job-setup and job-cleanup is needed for the job 
+   * 
+   * @return boolean 
+   */
+  public boolean getJobSetupCleanupNeeded() {
+    return conf.getBoolean("mapred.committer.job.setup.cleanup.needed", true);
+  }
+
+
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/MiniMRCluster.java Fri
Jun 26 07:52:40 2009
@@ -514,7 +514,7 @@
    */
   public void initializeJob(JobID jobId) throws IOException {
     JobInProgress job = jobTracker.getJobTracker().getJob(jobId);
-    job.initTasks();
+    jobTracker.getJobTracker().initJob(job);
   }
   
   /**

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
Fri Jun 26 07:52:40 2009
@@ -198,6 +198,10 @@
       return null;
     }
 
+    public void initJob(JobInProgress job) {
+      // do nothing
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobTrackerRestart.java
Fri Jun 26 07:52:40 2009
@@ -467,7 +467,7 @@
     JobID id = job2.getID();*/
     JobInProgress jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
     
-    jip.initTasks();
+    mr.getJobTrackerRunner().getJobTracker().initJob(jip);
     
     // find out the history filename
     String history = 
@@ -482,7 +482,7 @@
     id = job1.getID();
     jip = mr.getJobTrackerRunner().getJobTracker().getJob(id);
     
-    jip.initTasks();
+    mr.getJobTrackerRunner().getJobTracker().initJob(jip);
     
     //  make sure that cleanup is launched and is waiting
     while (!jip.isCleanupLaunched()) {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
Fri Jun 26 07:52:40 2009
@@ -27,6 +27,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
 public class TestParallelInitialization extends TestCase {
   
@@ -135,6 +136,23 @@
       return null;
     }
 
+    public void initJob(JobInProgress job) {
+      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+      try {
+        job.initTasks();
+      } catch (IOException ioe) {
+        job.fail();
+      }
+      JobStatus newStatus = (JobStatus)job.getStatus().clone();
+      if (prevStatus.getRunState() != newStatus.getRunState()) {
+        JobStatusChangeEvent event = 
+          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+              newStatus);
+        for (JobInProgressListener listener : listeners) {
+          listener.jobUpdated(event);
+        }
+      }
+    }
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java?rev=788619&r1=788618&r2=788619&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/MapReduceTestUtil.java
Fri Jun 26 07:52:40 2009
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.mapreduce;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.text.NumberFormat;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Random;
 
 import org.apache.hadoop.conf.Configuration;
@@ -228,4 +228,29 @@
       }
     }
   }
+
+  public static Job createJob(Configuration conf, Path inDir, Path outDir, 
+      int numInputFiles, int numReds) throws IOException {
+    Job job = new Job(conf);
+    FileSystem fs = FileSystem.get(conf);
+    if (fs.exists(outDir)) {
+      fs.delete(outDir, true);
+    }
+    if (fs.exists(inDir)) {
+      fs.delete(inDir, true);
+    }
+    fs.mkdirs(inDir);
+    String input = "The quick brown fox\n" + "has many silly\n"
+      + "red fox sox\n";
+    for (int i = 0; i < numInputFiles; ++i) {
+      DataOutputStream file = fs.create(new Path(inDir, "part-" + i));
+      file.writeBytes(input);
+      file.close();
+    }    
+
+    FileInputFormat.setInputPaths(job, inDir);
+    FileOutputFormat.setOutputPath(job, outDir);
+    job.setNumReduceTasks(numReds);
+    return job;
+  }
 }

Added: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java?rev=788619&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
(added)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapreduce/TestNoJobSetupCleanup.java
Fri Jun 26 07:52:40 2009
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
+import org.apache.hadoop.mapred.HadoopTestCase;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+
+public class TestNoJobSetupCleanup extends HadoopTestCase {
+  private static String TEST_ROOT_DIR =
+    new File(System.getProperty("test.build.data","/tmp"))
+    .toURI().toString().replace(' ', '+');
+  private final Path inDir = new Path(TEST_ROOT_DIR, "./wc/input");
+  private final Path outDir = new Path(TEST_ROOT_DIR, "./wc/output");
+
+  public TestNoJobSetupCleanup() throws IOException {
+    super(HadoopTestCase.CLUSTER_MR , HadoopTestCase.LOCAL_FS, 2, 2);
+  }
+
+  private Job submitAndValidateJob(Configuration conf, int numMaps, int numReds) 
+      throws IOException, InterruptedException, ClassNotFoundException {
+    Job job = MapReduceTestUtil.createJob(conf, inDir, outDir, 
+                numMaps, numReds);
+    job.setJobSetupCleanupNeeded(false);
+    job.setOutputFormatClass(MyOutputFormat.class);
+    job.waitForCompletion(true);
+    assertTrue(job.isSuccessful());
+    JobID jobid = (org.apache.hadoop.mapred.JobID)job.getID();
+    JobClient jc = new JobClient(conf);
+    assertTrue(jc.getSetupTaskReports(jobid).length == 0);
+    assertTrue(jc.getCleanupTaskReports(jobid).length == 0);
+    assertTrue(jc.getMapTaskReports(jobid).length == numMaps);
+    assertTrue(jc.getReduceTaskReports(jobid).length == numReds);
+    FileSystem fs = FileSystem.get(conf);
+    assertTrue("Job output directory doesn't exit!", fs.exists(outDir));
+    FileStatus[] list = fs.listStatus(outDir, new OutputFilter());
+    int numPartFiles = numReds == 0 ? numMaps : numReds;
+    assertTrue("Number of part-files is " + list.length + " and not "
+        + numPartFiles, list.length == numPartFiles);
+    return job;
+  }
+  
+  public void testNoJobSetupCleanup() throws Exception {
+    try {
+      Configuration conf = createJobConf();
+ 
+      // run a job without job-setup and cleanup
+      submitAndValidateJob(conf, 1, 1);
+
+      // run a map only job.
+      submitAndValidateJob(conf, 1, 0);
+
+      // run empty job without job setup and cleanup 
+      submitAndValidateJob(conf, 0, 0);
+
+      // run empty job without job setup and cleanup, with non-zero reduces 
+      submitAndValidateJob(conf, 0, 1);
+    } finally {
+      tearDown();
+    }
+  }
+  
+  static class MyOutputFormat extends TextOutputFormat {
+    public void checkOutputSpecs(JobContext job) 
+        throws FileAlreadyExistsException, IOException{
+      super.checkOutputSpecs(job);
+      // creating dummy TaskAttemptID
+      TaskAttemptID tid = new TaskAttemptID("jt", 1, TaskType.JOB_SETUP, 0, 0);
+      getOutputCommitter(new TaskAttemptContext(job.getConfiguration(), tid)).
+        setupJob(job);
+    }
+  }
+
+  private static class OutputFilter implements PathFilter {
+    public boolean accept(Path path) {
+      return !(path.getName().startsWith("_"));
+    }
+  }
+}



Mime
View raw message