hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r803049 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/jav...
Date Tue, 11 Aug 2009 10:07:03 GMT
Author: ddas
Date: Tue Aug 11 10:07:02 2009
New Revision: 803049

URL: http://svn.apache.org/viewvc?rev=803049&view=rev
Log:
MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact the JobTracker lock
hierarchy wasn't maintained in some JobInProgress method calls. Contributed by Amar Kamat.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Aug 11 10:07:02 2009
@@ -347,3 +347,7 @@
 
     MAPREDUCE-813. Updates Streaming and M/R tutorial documents.
     (Corinne Chandel via ddas)
+
+    MAPREDUCE-805. Fixes some deadlocks in the JobTracker due to the fact
+    the JobTracker lock hierarchy wasn't maintained in some JobInProgress
+    method calls. (Amar Kamat via ddas)

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Tue Aug 11 10:07:02 2009
@@ -35,6 +35,8 @@
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobInProgress;
+import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.conf.Configuration;
@@ -505,20 +507,35 @@
     }
 
     public void initJob(JobInProgress jip) {
-      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
       try {
+        JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
         jip.initTasks();
-      } catch (IOException ioe) {
-        jip.fail();
+        completeEmptyJob(jip);
+        JobStatus newStatus = (JobStatus)jip.getStatus().clone();
+        JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
+          EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
+         for (JobInProgressListener listener : listeners) {
+           listener.jobUpdated(event);
+         }
+      } catch (Exception ioe) {
+        failJob(jip);
       }
+    }
+
+    private synchronized void completeEmptyJob(JobInProgress jip) {
+      jip.completeEmptyJob();
+    }
+
+    public synchronized void failJob(JobInProgress jip) {
+      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
+      jip.fail();
       JobStatus newStatus = (JobStatus)jip.getStatus().clone();
       JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
-        EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
-       for (JobInProgressListener listener : listeners) {
-         listener.jobUpdated(event);
-       }
+          EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
+      for (JobInProgressListener listener : listeners) {
+        listener.jobUpdated(event);
+      }
     }
-    
 
     public void removeJob(JobID jobid) {
       jobs.remove(jobid);

Modified: hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
Tue Aug 11 10:07:02 2009
@@ -100,6 +100,8 @@
     }
     public void initJob(JobInProgress job) {
     }
+    public void failJob(JobInProgress job) {
+    }
     public void startTask(String taskTrackerName, final Task t) {
     }
     public boolean killTask(TaskAttemptID attemptId, boolean shouldFail) {

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Tue Aug 11 10:07:02 2009
@@ -351,6 +351,10 @@
       // do nothing
     }
     
+    public void failJob (JobInProgress job) {
+      // do nothing
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Aug 11
10:07:02 2009
@@ -42,7 +42,6 @@
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobHistory.Values;
-import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -65,6 +64,16 @@
  * This is NOT a public interface!
  */
 public class JobInProgress {
+  /**
+   * Used when the a kill is issued to a job which is initializing.
+   */
+  static class KillInterruptedException extends InterruptedException {
+   private static final long serialVersionUID = 1L;
+    public KillInterruptedException(String msg) {
+      super(msg);
+    }
+  }
+  
   static final Log LOG = LogFactory.getLog(JobInProgress.class);
     
   JobProfile profile;
@@ -106,7 +115,7 @@
   private volatile boolean launchedSetup = false;
   private volatile boolean jobKilled = false;
   private volatile boolean jobFailed = false;
-  private boolean jobSetupCleanupNeeded = true;
+  private final boolean jobSetupCleanupNeeded;
 
   JobPriority priority = JobPriority.NORMAL;
   protected JobTracker jobtracker;
@@ -517,14 +526,17 @@
 
   /**
    * Construct the splits, etc.  This is invoked from an async
-   * thread so that split-computation doesn't block anyone.
-   */
-  public synchronized void initTasks() throws IOException {
-    if (tasksInited.get()) {
+   * thread so that split-computation doesn't block anyone. Only the 
+   * {@link JobTracker} should invoke this api. Look 
+   * at {@link JobTracker#initJob(JobInProgress)} for more details.
+   */
+  public synchronized void initTasks() 
+  throws IOException, KillInterruptedException {
+    if (tasksInited.get() || isComplete()) {
       return;
     }
     synchronized(jobInitKillStatus){
-      if(jobInitKillStatus.killed) {
+      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
         return;
       }
       jobInitKillStatus.initStarted = true;
@@ -575,19 +587,33 @@
       jobInitKillStatus.initDone = true;
       if(jobInitKillStatus.killed) {
         //setup not launched so directly terminate
-        terminateJob(JobStatus.KILLED);
-        return;
+        throw new KillInterruptedException("Job " + jobId + " killed in init");
       }
     }
     
     tasksInited.set(true);
     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
                                  numMapTasks, numReduceTasks);
-    
-    // if setup is not needed, mark it complete
-    if (!jobSetupCleanupNeeded) {
-      setupComplete();
-    }
+  }
+
+  // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
+  // else return false.
+  synchronized boolean isJobEmpty() {
+    return maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded;
+  }
+  
+  synchronized boolean isSetupCleanupRequired() {
+   return jobSetupCleanupNeeded;
+  }
+
+  // Should be called once the init is done. This will complete the job 
+  // because the job is empty (0 maps, 0 reduces and no setup-cleanup).
+  synchronized void completeEmptyJob() {
+    jobComplete();
+  }
+
+  synchronized void completeSetup() {
+    setupComplete();
   }
 
   void logToJobHistory() throws IOException {
@@ -684,10 +710,6 @@
   
   private void setupComplete() {
     status.setSetupProgress(1.0f);
-    if (maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded)
{
-      jobComplete();
-      return;
-    }
     if (this.status.getRunState() == JobStatus.PREP) {
       this.status.setRunState(JobStatus.RUNNING);
       JobHistory.JobInfo.logStarted(profile.getJobID());
@@ -752,15 +774,14 @@
   }
   public void setPriority(JobPriority priority) {
     if(priority == null) {
-      this.priority = JobPriority.NORMAL;
-    } else {
-      this.priority = priority;
+      priority = JobPriority.NORMAL;
     }
     synchronized (this) {
+      this.priority = priority;
       status.setJobPriority(priority);
+      // log and change to the job's priority
+      JobHistory.JobInfo.logJobPriority(jobId, priority);
     }
-    // log and change to the job's priority
-    JobHistory.JobInfo.logJobPriority(jobId, priority);
   }
 
   // Update the job start/launch time (upon restart) and log to history
@@ -1252,7 +1273,7 @@
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    if(!tasksInited.get()) {
+    if(!tasksInited.get() || !jobSetupCleanupNeeded) {
       return null;
     }
     
@@ -1333,7 +1354,7 @@
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    if(!tasksInited.get()) {
+    if(!tasksInited.get() || !jobSetupCleanupNeeded) {
       return null;
     }
     
@@ -2751,15 +2772,12 @@
   }
 
   /**
-   * Kill the job and all its component tasks. This method is called from 
+   * Kill the job and all its component tasks. This method should be called from 
    * jobtracker and should return fast as it locks the jobtracker.
    */
   public void kill() {
     boolean killNow = false;
     synchronized(jobInitKillStatus) {
-      if(jobInitKillStatus.killed) {//job is already marked for killing
-        return;
-      }
       jobInitKillStatus.killed = true;
       //if not in middle of init, terminate it now
       if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
@@ -2773,7 +2791,12 @@
   }
   
   /**
-   * Fails the job and all its component tasks.
+   * Fails the job and all its component tasks. This should be called only from
+   * {@link JobInProgress} or {@link JobTracker}. Look at 
+   * {@link JobTracker#failJob(JobInProgress)} for more details.
+   * Note that the job doesnt expect itself to be failed before its inited. 
+   * Only when the init is done (successfully or otherwise), the job can be 
+   * failed. 
    */
   synchronized void fail() {
     terminate(JobStatus.FAILED);

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Aug 11 10:07:02
2009
@@ -64,6 +64,7 @@
 import org.apache.hadoop.mapred.JobHistory.Keys;
 import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
@@ -3505,8 +3506,13 @@
       return;
     }
         
-    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     checkAccess(job, Queue.QueueOperation.ADMINISTER_JOBS);
+    killJob(job);
+  }
+
+  private synchronized void killJob(JobInProgress job) {
+    LOG.info("Killing job " + job.getJobID());
+    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     job.kill();
     
     // Inform the listeners if the job is killed
@@ -3525,38 +3531,87 @@
     }
   }
 
+  /**
+   * Initialize a job and inform the listeners about a state change, if any.
+   * Other components in the framework should use this api to initialize a job.
+   */
   public void initJob(JobInProgress job) {
     if (null == job) {
       LOG.info("Init on null job is not valid");
       return;
     }
 	        
-    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
     try {
+      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
       LOG.info("Initializing " + job.getJobID());
       job.initTasks();
+      // Here the job *should* be in the PREP state.
+      // From here there are 3 ways :
+      //  - job requires setup : the job remains in PREP state and 
+      //    setup is launched to move the job in RUNNING state
+      //  - job is complete (no setup required and no tasks) : complete 
+      //    the job and move it to SUCCEEDED
+      //  - job has tasks but doesnt require setup : make the job RUNNING.
+      if (job.isJobEmpty()) { // is the job empty?
+        completeEmptyJob(job); // complete it
+      } else if (!job.isSetupCleanupRequired()) { // setup/cleanup not required
+        job.completeSetup(); // complete setup and make job running
+      }
+      // Inform the listeners if the job state has changed
+      // Note : 
+      //   If job does not require setup, job state will be RUNNING
+      //   If job is configured with 0 maps, 0 reduces and no setup-cleanup then 
+      //   the job state will be SUCCEEDED
+      //   otherwise, job state is PREP.
+      JobStatus newStatus = (JobStatus)job.getStatus().clone();
+      if (prevStatus.getRunState() != newStatus.getRunState()) {
+        JobStatusChangeEvent event = 
+          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+              newStatus);
+        synchronized (JobTracker.this) {
+          updateJobInProgressListeners(event);
+        }
+      }
+    } catch (KillInterruptedException kie) {
+      //   If job was killed during initialization, job state will be KILLED
+      LOG.error("Job initialization interrupted :\n" +
+          StringUtils.stringifyException(kie));
+      killJob(job);
     } catch (Throwable t) {
+      //    If the job initialization is failed, job state will be FAILED
       LOG.error("Job initialization failed:\n" +
           StringUtils.stringifyException(t));
-      if (job != null) {
-        job.fail();
-      }
+      failJob(job);
+    }
+  }
+
+  // This simply marks the job as completed. Note that the caller is responsible
+  // for raising events.
+  private synchronized void completeEmptyJob(JobInProgress job) {
+    job.completeEmptyJob();
+  }
+  
+  /**
+   * Fail a job and inform the listeners. Other components in the framework 
+   * should use this to fail a job.
+   */
+  public synchronized void failJob(JobInProgress job) {
+    if (null == job) {
+      LOG.info("Fail on null job is not valid");
+      return;
     }
-	    
+         
+    JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+    LOG.info("Failing job " + job.getJobID());
+    job.fail();
+     
     // Inform the listeners if the job state has changed
-    // Note : 
-    //   If the job initialization is failed, job state will be FAILED
-    //   If job was killed during initialization, job state will be KILLED
-    //   If job does not require setup, job state will be RUNNING
-    //   otherwise, job state is PREP.
     JobStatus newStatus = (JobStatus)job.getStatus().clone();
     if (prevStatus.getRunState() != newStatus.getRunState()) {
       JobStatusChangeEvent event = 
         new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
             newStatus);
-      synchronized (JobTracker.this) {
-        updateJobInProgressListeners(event);
-      }
+      updateJobInProgressListeners(event);
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerManager.java Tue Aug
11 10:07:02 2009
@@ -105,4 +105,11 @@
    * @param job JobInProgress object
    */
   public void initJob(JobInProgress job);
+  
+  /**
+   * Fail a job.
+   * 
+   * @param job JobInProgress object
+   */
+  public void failJob(JobInProgress job);
 }

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
Tue Aug 11 10:07:02 2009
@@ -99,7 +99,7 @@
     }
     
     @Override
-    void createMapTasks(String ignored, JobClient.RawSplit[] splits) {
+    protected void createMapTasks(String ignored, JobClient.RawSplit[] splits) {
       maps = new TaskInProgress[numMapTasks];
       for (int i = 0; i < numMapTasks; i++) {
         maps[i] = new TaskInProgress(getJobID(), "test", 
@@ -108,7 +108,7 @@
     }
 
     @Override
-    void createReduceTasks(String ignored) {
+    protected void createReduceTasks(String ignored) {
       reduces = new TaskInProgress[numReduceTasks];
       for (int i = 0; i < numReduceTasks; i++) {
         reduces[i] = new TaskInProgress(getJobID(), "test", 

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
Tue Aug 11 10:07:02 2009
@@ -27,6 +27,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
@@ -207,6 +208,10 @@
       // do nothing
     }
     
+    public void failJob(JobInProgress job) {
+      // do nothing
+    }
+    
     // Test methods
     
     public void submitJob(JobInProgress job) throws IOException {

Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=803049&r1=803048&r2=803049&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
(original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java
Tue Aug 11 10:07:02 2009
@@ -27,6 +27,7 @@
 import junit.framework.TestCase;
 
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
 public class TestParallelInitialization extends TestCase {
@@ -141,12 +142,33 @@
     }
 
     public void initJob(JobInProgress job) {
-      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
       try {
+        JobStatus prevStatus = (JobStatus)job.getStatus().clone();
         job.initTasks();
+        completeEmptyJob(job);
+        JobStatus newStatus = (JobStatus)job.getStatus().clone();
+        if (prevStatus.getRunState() != newStatus.getRunState()) {
+          JobStatusChangeEvent event = 
+            new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus, 
+                newStatus);
+          for (JobInProgressListener listener : listeners) {
+            listener.jobUpdated(event);
+          }
+        }
+      } catch (KillInterruptedException kie) {
+        killJob(job.getJobID());
       } catch (IOException ioe) {
-        job.fail();
+        failJob(job);
       }
+    }
+
+    private synchronized void completeEmptyJob(JobInProgress job) {
+      job.completeEmptyJob();
+    }
+
+    public synchronized void failJob(JobInProgress job) {
+      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
+      job.fail();
       JobStatus newStatus = (JobStatus)job.getStatus().clone();
       if (prevStatus.getRunState() != newStatus.getRunState()) {
         JobStatusChangeEvent event = 



Mime
View raw message