hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sha...@apache.org
Subject svn commit: r1134246 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/...
Date Fri, 10 Jun 2011 09:16:37 GMT
Author: sharad
Date: Fri Jun 10 09:16:37 2011
New Revision: 1134246

URL: http://svn.apache.org/viewvc?rev=1134246&view=rev
Log:
MAPREDUCE-2582. Cleanup JobHistory event generation. Contributed by Siddharth Seth.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Fri Jun 10 09:16:37 2011
@@ -4,7 +4,9 @@ Trunk (unreleased changes)
 
 
     MAPREDUCE-279
-    
+
+    MAPREDUCE-2582. Cleanup JobHistory event generation.(Siddharth Seth via sharad)   
+ 
     Add ability to includes src files in assembly target for maven (Luke Lu via mahadev)
  
     Added few job diagnostic messages. (sharad)

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
Fri Jun 10 09:16:37 2011
@@ -365,7 +365,7 @@ public class JobHistoryEventHandler exte
       throw new YarnException(e);
     }
     // check for done
-    if (event.getHistoryEvent().getEventType().equals(EventType.JOB_FINISHED)) {
+    if (event.getHistoryEvent().getEventType() == EventType.JOB_FINISHED) {
       try {
           JobFinishedEvent jFinishedEvent = (JobFinishedEvent) event
               .getHistoryEvent();
@@ -378,6 +378,19 @@ public class JobHistoryEventHandler exte
         throw new YarnException(e);
       }
     }
+      if (event.getHistoryEvent().getEventType() == EventType.JOB_FAILED
+          || event.getHistoryEvent().getEventType() == EventType.JOB_KILLED) {
+        try {
+          JobUnsuccessfulCompletionEvent jucEvent = (JobUnsuccessfulCompletionEvent) event
+              .getHistoryEvent();
+          mi.getJobIndexInfo().setFinishTime(jucEvent.getFinishTime());
+          mi.getJobIndexInfo().setNumMaps(jucEvent.getFinishedMaps());
+          mi.getJobIndexInfo().setNumReduces(jucEvent.getFinishedReduces());
+          closeEventWriter(event.getJobID());
+        } catch (IOException e) {
+          throw new YarnException(e);
+        }
+      }
   }
   }
 
@@ -433,6 +446,11 @@ public class JobHistoryEventHandler exte
     if (mi == null) {
       throw new IOException("No MetaInfo found for JobId: [" + jobId + "]");
     }
+    if (!mi.isWriterActive()) {
+      throw new IOException(
+          "Inactive Writer: Likely received multiple JobFinished / JobUnsuccessful events
for JobId: ["
+              + jobId + "]");
+    }
     try {
         mi.closeWriter();
     } catch (IOException e) {
@@ -523,6 +541,8 @@ public class JobHistoryEventHandler exte
 
     JobSummary getJobSummary() { return jobSummary; }
 
+    boolean isWriterActive() {return writer != null ; }
+
     void closeWriter() throws IOException {
       synchronized (lock) {
       if (writer != null) {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
Fri Jun 10 09:16:37 2011
@@ -36,6 +36,7 @@ import java.util.concurrent.locks.Reentr
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -51,6 +52,7 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
@@ -333,6 +335,7 @@ public class JobImpl implements org.apac
   private int failedReduceTaskCount = 0;
   private int killedMapTaskCount = 0;
   private int killedReduceTaskCount = 0;
+  private long submitTime;
   private long startTime;
   private long finishTime;
   private float setupProgress;
@@ -709,7 +712,7 @@ public class JobImpl implements org.apac
      */
     @Override
     public JobState transition(JobImpl job, JobEvent event) {
-      job.startTime = job.clock.getTime();
+      job.submitTime = job.clock.getTime();
       job.metrics.submittedJob(job);
       job.metrics.preparingJob(job);
       try {
@@ -717,14 +720,14 @@ public class JobImpl implements org.apac
         job.fs = FileSystem.get(job.conf);
 
         //log to job history
-        //TODO_JH_Validate the values being sent here (along with defaults). Ideally for
all JH evnts.
-        JobSubmittedEvent jse =
-          new JobSubmittedEvent(job.oldJobId, 
+        JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
               job.conf.get(MRJobConfig.JOB_NAME, "test"), 
-              job.conf.get(MRJobConfig.USER_NAME,"mapred"), job.startTime,
-              job.remoteJobConfFile.toString(), job.jobACLs, 
-              job.conf.get(MRJobConfig.QUEUE_NAME,"test"));
+            job.conf.get(MRJobConfig.USER_NAME, "mapred"),
+            job.submitTime,
+            job.remoteJobConfFile.toString(),
+            job.jobACLs, job.conf.get(MRJobConfig.QUEUE_NAME, "test"));
         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
+        //TODO JH Verify jobACLs, UserName via UGI?
 
         TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
         job.numMapTasks = taskSplitMetaInfo.length;
@@ -868,6 +871,7 @@ public class JobImpl implements org.apac
 
         job.metrics.endPreparingJob(job);
         return JobState.INITED;
+        //TODO XXX Should JobInitedEvent be generated here (instead of in StartTransition)
 
       } catch (Exception e) {
         LOG.warn("Job init failed", e);
@@ -1025,8 +1029,11 @@ public class JobImpl implements org.apac
              job.startTime,
              job.numMapTasks, job.numReduceTasks,
              job.isUber, 0, 0,  // FIXME: lose latter two args again (old-style uber junk:
 needs to go along with 98% of other old-style uber junk)
-             JobState.NEW.toString());
+             job.getState().toString()); //Will transition to state running. Currently in
INITED
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
+      JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
+          job.submitTime, job.startTime);
+      job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
       job.metrics.runningJob(job);
     }
   }
@@ -1038,26 +1045,29 @@ public class JobImpl implements org.apac
     } catch (IOException e) {
       LOG.warn("Could not abortJob", e);
     }
+    if (finishTime == 0) setFinishTime();
     cleanupProgress = 1.0f;
     JobUnsuccessfulCompletionEvent unsuccessfulJobEvent =
       new JobUnsuccessfulCompletionEvent(oldJobId,
           finishTime,
           succeededMapTaskCount,
-          numReduceTasks, //TODO finishedReduceTasks
+          succeededReduceTaskCount,
           finalState.toString());
     eventHandler.handle(new JobHistoryEvent(jobId, unsuccessfulJobEvent));
+  }
     
-    JobFinishedEvent jfe =
-      new JobFinishedEvent(oldJobId,
-          finishTime,
-          succeededMapTaskCount,
-          succeededReduceTaskCount, failedMapTaskCount,
-          failedReduceTaskCount,
-          TypeConverter.fromYarn(getCounters()), //TODO replace with MapCounter
-          TypeConverter.fromYarn(getCounters()), // TODO reduceCounters
-          TypeConverter.fromYarn(getCounters()));
-    eventHandler.handle(new JobHistoryEvent(jobId, jfe));
-    //TODO Does this require a JobFinishedEvent?
+  // JobFinishedEvent triggers the move of the history file out of the staging
+  // area. May need to create a new event type for this if JobFinished should 
+  // not be generated for KilledJobs, etc.
+  private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
+    JobFinishedEvent jfe = new JobFinishedEvent(
+        job.oldJobId, job.finishTime,
+        job.succeededMapTaskCount, job.succeededReduceTaskCount,
+        job.failedMapTaskCount, job.failedReduceTaskCount,
+        TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounters
+        TypeConverter.fromYarn(job.getCounters()), //TODO replace with ReduceCoutners
+        TypeConverter.fromYarn(job.getCounters()));
+    return jfe;
   }
 
   // Task-start has been moved out of InitTransition, so this arc simply
@@ -1066,10 +1076,11 @@ public class JobImpl implements org.apac
   implements SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
+      job.setFinishTime();
       JobUnsuccessfulCompletionEvent failedEvent =
           new JobUnsuccessfulCompletionEvent(job.oldJobId,
               job.finishTime, 0, 0,
-              org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct
state
+              JobState.KILLED.toString());
       job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
       job.finished(JobState.KILLED);
     }
@@ -1187,14 +1198,6 @@ public class JobImpl implements org.apac
         job.failedReduceTaskCount*100 > 
         job.allowedReduceFailuresPercent*job.numReduceTasks) {
         job.setFinishTime();
-        JobUnsuccessfulCompletionEvent failedEvent =
-          new JobUnsuccessfulCompletionEvent(job.oldJobId,
-              job.finishTime,
-              job.failedMapTaskCount,
-              job.failedReduceTaskCount, //TODO finishedReduceTasks
-              org.apache.hadoop.mapreduce.JobStatus.State.FAILED.toString()); //TODO correct
state
-        job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
-        //TODO This event not likely required - sent via abort(). 
 
         String diagnosticMsg = "Job failed as tasks failed. " +
             "failedMaps:" + job.failedMapTaskCount + 
@@ -1214,17 +1217,9 @@ public class JobImpl implements org.apac
         }
        // Log job-history
         job.setFinishTime();
-        JobFinishedEvent jfe =
-        new JobFinishedEvent(TypeConverter.fromYarn(job.jobId),
-          job.finishTime,
-          job.succeededMapTaskCount, job.numReduceTasks, job.failedMapTaskCount,
-          job.failedReduceTaskCount,
-          TypeConverter.fromYarn(job.getCounters()), //TODO replace with MapCounter
-          TypeConverter.fromYarn(job.getCounters()), // TODO reduceCounters
-          TypeConverter.fromYarn(job.getCounters()));
+        JobFinishedEvent jfe = createJobFinishedEvent(job);
         LOG.info("Calling handler for JobFinishedEvent ");
         job.eventHandler.handle(new JobHistoryEvent(job.jobId, jfe));
-
         return job.finished(JobState.SUCCEEDED);
       }
       
@@ -1302,7 +1297,13 @@ public class JobImpl implements org.apac
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
-      //TODO JH Event?
+      //TODO Is this JH event required.
+      job.setFinishTime();
+      JobUnsuccessfulCompletionEvent failedEvent =
+          new JobUnsuccessfulCompletionEvent(job.oldJobId,
+              job.finishTime, 0, 0,
+              JobState.ERROR.toString());
+      job.eventHandler.handle(new JobHistoryEvent(job.jobId, failedEvent));
       job.finished(JobState.ERROR);
     }
   }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/MapTaskImpl.java
Fri Jun 10 09:16:37 2011
@@ -75,4 +75,7 @@ public class MapTaskImpl extends TaskImp
     return TaskType.MAP;
   }
 
+  protected TaskSplitMetaInfo getTaskSplitMetaInfo() {
+    return this.taskSplitMetaInfo;
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
Fri Jun 10 09:16:37 2011
@@ -850,6 +850,18 @@ public abstract class TaskAttemptImpl im
     }
   }
 
+  private static TaskAttemptUnsuccessfulCompletionEvent createTaskAttemptUnsuccessfulCompletionEvent(
+      TaskAttemptImpl taskAttempt, TaskAttemptState attemptState) {
+    TaskAttemptUnsuccessfulCompletionEvent tauce = new TaskAttemptUnsuccessfulCompletionEvent(
+        TypeConverter.fromYarn(taskAttempt.attemptId),
+        TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
+        attemptState.toString(), taskAttempt.finishTime,
+        taskAttempt.containerMgrAddress,
+        taskAttempt.reportedStatus.diagnosticInfo.toString());
+      //TODO Different constructor - allSplits
+    return tauce;
+  }
+
   private static String[] racks = new String[] {NetworkTopology.DEFAULT_RACK};
   private static class RequestContainerTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@@ -956,6 +968,10 @@ public abstract class TaskAttemptImpl im
               TaskEventType.T_ATTEMPT_KILLED));
           break;
       }
+      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
+          taskAttempt, finalState);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
+          .getTaskId().getJobId(), tauce));
     }
   }
 
@@ -970,6 +986,7 @@ public abstract class TaskAttemptImpl im
       // for it
       taskAttempt.taskAttemptListener.register(
           taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
+      //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
                                                                   // Costly?
@@ -1058,17 +1075,11 @@ public abstract class TaskAttemptImpl im
         TaskAttemptEvent event) {
       //set the finish time
       taskAttempt.setFinishTime();
-      TaskAttemptUnsuccessfulCompletionEvent ta =
-          new TaskAttemptUnsuccessfulCompletionEvent(
-          TypeConverter.fromYarn(taskAttempt.attemptId),
-          TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-          TaskAttemptState.FAILED.toString(),
-          taskAttempt.finishTime,
-          "hostname",
-          taskAttempt.reportedStatus.diagnosticInfo.toString());
-      taskAttempt.eventHandler.handle(
-          new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), ta));
-      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED);
+      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
+          taskAttempt, TaskAttemptState.FAILED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
+          .getTaskId().getJobId(), tauce));
+//      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.FAILED); Not handling failed
map/reduce events.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_FAILED));
@@ -1081,9 +1092,9 @@ public abstract class TaskAttemptImpl im
          new MapAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
          state.toString(),
-         finishTime,
-         finishTime, "hostname",
-         state.toString(),
+         finishTime, //TODO TaskAttemptStatus changes. MapFinishTime
+         finishTime, this.containerMgrAddress,
+         state.toString(), //TODO state is a progress string.
          TypeConverter.fromYarn(getCounters()),null);
          eventHandler.handle(
            new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
@@ -1092,9 +1103,9 @@ public abstract class TaskAttemptImpl im
          new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(attemptId),
          TypeConverter.fromYarn(attemptId.getTaskId().getTaskType()),
          state.toString(),
-         finishTime,
-         finishTime,
-         finishTime, "hostname",
+         finishTime, //TODO TaskAttemptStatus changes. ShuffleFinishTime
+         finishTime, //TODO TaskAttemptStatus changes. SortFinishTime
+         finishTime, this.containerMgrAddress,
          state.toString(),
          TypeConverter.fromYarn(getCounters()),null);
          eventHandler.handle(
@@ -1110,6 +1121,10 @@ public abstract class TaskAttemptImpl im
       taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
       //set the finish time
       taskAttempt.setFinishTime();
+      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
+          taskAttempt, TaskAttemptState.FAILED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
+          .getTaskId().getJobId(), tauce));
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
     }
@@ -1123,17 +1138,11 @@ public abstract class TaskAttemptImpl im
         TaskAttemptEvent event) {
       //set the finish time
       taskAttempt.setFinishTime();
-      TaskAttemptUnsuccessfulCompletionEvent tke =
-          new TaskAttemptUnsuccessfulCompletionEvent(
-          TypeConverter.fromYarn(taskAttempt.attemptId),
-          TypeConverter.fromYarn(taskAttempt.attemptId.getTaskId().getTaskType()),
-          TaskAttemptState.KILLED.toString(),
-          taskAttempt.finishTime,
-          TaskAttemptState.KILLED.toString(),
-          taskAttempt.reportedStatus.diagnosticInfo.toString());
-      taskAttempt.eventHandler.handle(
-          new JobHistoryEvent(taskAttempt.attemptId.getTaskId().getJobId(), tke));
-      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED);
+      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
+          taskAttempt, TaskAttemptState.KILLED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
+          .getTaskId().getJobId(), tauce));
+//      taskAttempt.logAttemptFinishedEvent(TaskAttemptState.KILLED); Not logging Map/Reduce
attempts in case of failure.
       taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
           taskAttempt.attemptId,
           TaskEventType.T_ATTEMPT_KILLED));

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
Fri Jun 10 09:16:37 2011
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
@@ -95,6 +96,7 @@ public abstract class TaskImpl implement
   private final Lock readLock;
   private final Lock writeLock;
   private final MRAppMetrics metrics;
+  private long scheduledTime;
   
   private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
   
@@ -386,10 +388,14 @@ public abstract class TaskImpl implement
         launchTime = at.getLaunchTime();
       }
     }
+    if (launchTime == 0) {
+      return this.scheduledTime;
+    }
     return launchTime;
   }
 
   //this is always called in read/write lock
+  //TODO Verify behaviour is Task is killed (no finished attempt)
   private long getFinishTime() {
     if (!isFinished()) {
       return 0;
@@ -404,6 +410,20 @@ public abstract class TaskImpl implement
     return finishTime;
   }
 
+  private long getFinishTime(TaskAttemptId taId) {
+    if (taId == null) {
+      return clock.getTime();
+    }
+    long finishTime = 0;
+    for (TaskAttempt at : attempts.values()) {
+      //select the max finish time of all attempts
+      if (at.getID().equals(taId)) {
+        return at.getFinishTime();
+      }
+    }
+    return finishTime;
+  }
+  
   private TaskState finished(TaskState finalState) {
     if (getState() == TaskState.RUNNING) {
       metrics.endRunningTask(this);
@@ -558,20 +578,55 @@ public abstract class TaskImpl implement
     }
   }
 
+  private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState)
{
+    TaskFinishedEvent tfe =
+      new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
+        task.getFinishTime(task.successfulAttempt),
+        TypeConverter.fromYarn(task.taskId.getTaskType()),
+        taskState.toString(),
+        TypeConverter.fromYarn(task.getCounters()));
+    return tfe;
+  }
+  
+  private static TaskFailedEvent createTaskFailedEvent(TaskImpl task, String error, TaskState
taskState, TaskAttemptId taId) {
+    TaskFailedEvent taskFailedEvent = new TaskFailedEvent(
+        TypeConverter.fromYarn(task.taskId),
+     // Hack since getFinishTime needs isFinished to be true and that doesn't happen till
after the transition.
+        task.getFinishTime(taId),
+        TypeConverter.fromYarn(task.getType()),
+        error == null ? "" : error,
+        taskState.toString(),
+        taId == null ? null : TypeConverter.fromYarn(taId));
+    return taskFailedEvent;
+  }
+  
   private static class InitialScheduleTransition
     implements SingleArcTransition<TaskImpl, TaskEvent> {
 
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
-      TaskStartedEvent tse = new TaskStartedEvent(TypeConverter
-          .fromYarn(task.taskId), task.getLaunchTime(), TypeConverter
-          .fromYarn(task.taskId.getTaskType()), TaskState.RUNNING.toString());
-      task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
-      //TODO This is a transition from NEW to SCHEDULED, not RUNNING
-
       task.addAndScheduleAttempt();
+      task.scheduledTime = task.clock.getTime();
+      TaskStartedEvent tse = new TaskStartedEvent(
+          TypeConverter.fromYarn(task.taskId), task.getLaunchTime(),
+          TypeConverter.fromYarn(task.taskId.getTaskType()),
+          task instanceof MapTaskImpl ? splitsAsString(((MapTaskImpl) task) //TODO Should
not be accessing MapTaskImpl
+              .getTaskSplitMetaInfo().getLocations()) : "");
+      task.eventHandler
+          .handle(new JobHistoryEvent(task.taskId.getJobId(), tse));
       task.metrics.launchedTask(task);
     }
+    
+    private String splitsAsString(String[] splits) {
+      if (splits == null || splits.length == 0)
+        return "";
+      StringBuilder sb = new StringBuilder();
+      for (int i = 0; i < splits.length; i++) {
+        if (i != 0) sb.append(",");
+        sb.append(splits[i]);
+      }
+      return sb.toString();
+    }
   }
 
   // Used when creating a new attempt while one is already running.
@@ -625,12 +680,7 @@ public abstract class TaskImpl implement
           task.taskId, TaskState.SUCCEEDED));
       LOG.info("Task succeeded with attempt " + task.successfulAttempt);
       // issue kill to all other attempts
-      TaskFinishedEvent tfe =
-          new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
-            task.getFinishTime(),
-            TypeConverter.fromYarn(task.taskId.getTaskType()),
-            TaskState.SUCCEEDED.toString(),
-            TypeConverter.fromYarn(task.getCounters()));
+      TaskFinishedEvent tfe = createTaskFinishedEvent(task, TaskState.SUCCEEDED);
       task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
       for (TaskAttempt attempt : task.attempts.values()) {
         if (attempt.getID() != task.successfulAttempt &&
@@ -675,6 +725,11 @@ public abstract class TaskImpl implement
           TaskAttemptCompletionEventStatus.KILLED);
       // check whether all attempts are finished
       if (task.finishedAttempts == task.attempts.size()) {
+        TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, "",
+            finalState, null); //TODO JH verify failedAttempt null
+        task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
+            taskFailedEvent)); 
+
         task.eventHandler.handle(
             new JobTaskEvent(task.taskId, finalState));
         return finalState;
@@ -704,13 +759,13 @@ public abstract class TaskImpl implement
             ((TaskTAttemptEvent) event).getTaskAttemptID(), 
             TaskAttemptCompletionEventStatus.TIPFAILED);
         TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
-        TaskFinishedEvent tfi =
-            new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
-                task.getFinishTime(),
-            TypeConverter.fromYarn(task.taskId.getTaskType()),
-                TaskState.FAILED.toString(),
-                TypeConverter.fromYarn(task.getCounters()));
-        task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfi));
+        TaskAttemptId taId = ev.getTaskAttemptID();
+        
+        //TODO JH Populate the error string. FailReason from TaskAttempt(taId)
+        TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, "",
+            TaskState.FAILED, taId);
+        task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
+            taskFailedEvent));
         task.eventHandler.handle(
             new JobTaskEvent(task.taskId, TaskState.FAILED));
         return task.finished(TaskState.FAILED);
@@ -761,12 +816,12 @@ public abstract class TaskImpl implement
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
     public void transition(TaskImpl task, TaskEvent event) {
-      TaskFinishedEvent tfe =
-          new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
-              task.getFinishTime(), TypeConverter.fromYarn(task.taskId.getTaskType()),
-              TaskState.KILLED.toString(), TypeConverter.fromYarn(task
-                  .getCounters()));
-      task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(), tfe));
+      
+      TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
+          TaskState.KILLED, null); //TODO Verify failedAttemptId is null
+      task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
+          taskFailedEvent));
+
       task.eventHandler.handle(
           new JobTaskEvent(task.taskId, TaskState.KILLED));
       task.metrics.endWaitingTask(task);

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
Fri Jun 10 09:16:37 2011
@@ -298,7 +298,7 @@ public class RecoveryService extends Com
         // send the status update event
         sendStatusUpdateEvent(aId, attInfo);
 
-        TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getState());
+        TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
         switch (state) {
         case SUCCEEDED:
           // send the done event

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AppController.java
Fri Jun 10 09:16:37 2011
@@ -121,7 +121,7 @@ public class AppController extends Contr
         notFound($(JOB_ID));
       }
     } catch (Exception e) {
-      badRequest(e.getMessage());
+      badRequest(e.getMessage() == null ? e.getClass().getName() : e.getMessage());
     }
   }
 

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedJob.java
Fri Jun 10 09:16:37 2011
@@ -30,7 +30,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapreduce.JobACL;
-import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@@ -131,7 +130,7 @@ public class CompletedJob implements org
   }
 
   //History data is leisurely loaded when task level data is requested
-  private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFileAbsolute)
{
+  private synchronized void loadFullHistoryData(boolean loadTasks, Path historyFileAbsolute)
throws IOException {
     if (jobInfo != null) {
       return; //data already loaded
     }
@@ -144,6 +143,8 @@ public class CompletedJob implements org
         throw new YarnException("Could not load history file " + historyFileAbsolute,
             e);
       }
+    } else {
+      throw new IOException("History file not found");
     }
     
     if (loadTasks) {

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java?rev=1134246&r1=1134245&r2=1134246&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
(original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
Fri Jun 10 09:16:37 2011
@@ -625,7 +625,7 @@ public class JobHistory extends Abstract
         addToLoadedJobCache(job);
         return job;
       } catch (IOException e) {
-        throw new YarnException(e);
+        throw new YarnException("Could not find/load job: " + metaInfo.getJobIndexInfo().getJobId(),
e);
       }
     }
   }
@@ -979,6 +979,7 @@ public class JobHistory extends Abstract
     Job job = null;
     try {
       job = findJob(jobId);
+      //This could return a null job.
     } catch (IOException e) {
       throw new YarnException(e);
     }



Mime
View raw message