hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r652364 [3/4] - in /hadoop/core/trunk: ./ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/...
Date Wed, 30 Apr 2008 12:25:06 GMT
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java Wed Apr 30 05:25:05 2008
@@ -18,7 +18,8 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.*;
+import java.io.IOException;
+
 
 /** 
  * <code>RunningJob</code> is the user-interface to query for details on a 
@@ -36,6 +37,12 @@
    * 
    * @return the job identifier.
    */
+  public JobID getID();
+  
+  /** @deprecated This method is deprecated and will be removed. Applications should 
+   * rather use {@link #getID()}.
+   */
+  @Deprecated
   public String getJobID();
   
   /**
@@ -128,8 +135,12 @@
    *                   job failure status.  
    * @throws IOException
    */
+  public void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
+  
+  /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
+  @Deprecated
   public void killTask(String taskId, boolean shouldFail) throws IOException;
-    
+  
   /**
    * Gets the counters for this job.
    * 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java Wed Apr 30 05:25:05 2008
@@ -290,9 +290,10 @@
       JobTracker tracker = 
         (JobTracker) getServletContext().getAttribute("job.tracker");
       
-      String jobId = request.getParameter("jobid");
-      if(jobId == null)
+      String jobIdStr = request.getParameter("jobid");
+      if(jobIdStr == null)
         return;
+      JobID jobId = JobID.forName(jobIdStr);
       String typeStr = request.getParameter("type");
       boolean isMap = false;
       if("map".equalsIgnoreCase(typeStr)) {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Apr 30 05:25:05 2008
@@ -22,32 +22,30 @@
 import java.io.DataOutput;
 import java.io.IOException;
 import java.net.URI;
+import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.text.NumberFormat;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.dfs.DistributedFileSystem;
+import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.fs.kfs.KosmosFileSystem;
 import org.apache.hadoop.fs.s3.S3FileSystem;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.UTF8;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.net.*;
 
 /** Base class for tasks. */
 abstract class Task implements Writable, Configurable {
@@ -98,9 +96,7 @@
   ////////////////////////////////////////////
 
   private String jobFile;                         // job configuration file
-  private String taskId;                          // unique, includes job id
-  private String jobId;                           // unique jobid
-  private String tipId;
+  private TaskAttemptID taskId;                          // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus; 										      // current status of the task
   private Path taskOutputPath;                    // task-specific output dir
@@ -117,12 +113,10 @@
     taskStatus = TaskStatus.createTaskStatus(isMapTask());
   }
 
-  public Task(String jobId, String jobFile, String tipId, 
-              String taskId, int partition) {
+  public Task(String jobFile, TaskAttemptID taskId, int partition) {
     this.jobFile = jobFile;
     this.taskId = taskId;
-    this.jobId = jobId;
-    this.tipId = tipId; 
+     
     this.partition = partition;
     this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, 
                                                   0.0f, 
@@ -132,7 +126,7 @@
                                                     TaskStatus.Phase.MAP : 
                                                     TaskStatus.Phase.SHUFFLE, 
                                                   counters);
-    this.mapOutputFile.setJobId(jobId);
+    this.mapOutputFile.setJobId(taskId.getJobID());
   }
 
   ////////////////////////////////////////////
@@ -140,16 +134,15 @@
   ////////////////////////////////////////////
   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
   public String getJobFile() { return jobFile; }
-  public String getTaskId() { return taskId; }
-  public String getTipId(){ return tipId; }
+  public TaskAttemptID getTaskID() { return taskId; }
   public Counters getCounters() { return counters; }
   
   /**
    * Get the job name for this task.
    * @return the job name
    */
-  public String getJobId() {
-    return jobId;
+  public JobID getJobID() {
+    return taskId.getJobID();
   }
   
   /**
@@ -180,10 +173,8 @@
   ////////////////////////////////////////////
 
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, jobFile);
-    UTF8.writeString(out, tipId); 
-    UTF8.writeString(out, taskId);
-    UTF8.writeString(out, jobId);
+    Text.writeString(out, jobFile);
+    taskId.write(out);
     out.writeInt(partition);
     if (taskOutputPath != null) {
       Text.writeString(out, taskOutputPath.toString());
@@ -193,10 +184,8 @@
     taskStatus.write(out);
   }
   public void readFields(DataInput in) throws IOException {
-    jobFile = UTF8.readString(in);
-    tipId = UTF8.readString(in);
-    taskId = UTF8.readString(in);
-    jobId = UTF8.readString(in);
+    jobFile = Text.readString(in);
+    taskId = TaskAttemptID.read(in);
     partition = in.readInt();
     String outPath = Text.readString(in);
     if (outPath.length() != 0) {
@@ -205,10 +194,11 @@
       taskOutputPath = null;
     }
     taskStatus.readFields(in);
-    this.mapOutputFile.setJobId(jobId); 
+    this.mapOutputFile.setJobId(taskId.getJobID()); 
   }
 
-  public String toString() { return taskId; }
+  @Override
+  public String toString() { return taskId.toString(); }
 
   private Path getTaskOutputPath(JobConf conf) {
     Path p = new Path(FileOutputFormat.getOutputPath(conf), 
@@ -226,11 +216,11 @@
    * Localize the given JobConf to be specific for this task.
    */
   public void localizeConfiguration(JobConf conf) throws IOException {
-    conf.set("mapred.tip.id", tipId); 
-    conf.set("mapred.task.id", taskId);
+    conf.set("mapred.tip.id", taskId.getTaskID().toString()); 
+    conf.set("mapred.task.id", taskId.toString());
     conf.setBoolean("mapred.task.is.map", isMapTask());
     conf.setInt("mapred.task.partition", partition);
-    conf.set("mapred.job.id", jobId);
+    conf.set("mapred.job.id", taskId.getJobID().toString());
     
     // The task-specific output path
     if (FileOutputFormat.getOutputPath(conf) != null) {
@@ -303,7 +293,7 @@
                 Thread.sleep(PROGRESS_INTERVAL);
               } 
               catch (InterruptedException e) {
-                LOG.debug(getTaskId() + " Progress/ping thread exiting " +
+                LOG.debug(getTaskID() + " Progress/ping thread exiting " +
                                         "since it got interrupted");
                 break;
               }
@@ -345,7 +335,7 @@
       }, "Comm thread for "+taskId);
     thread.setDaemon(true);
     thread.start();
-    LOG.debug(getTaskId() + " Progress/ping thread started");
+    LOG.debug(getTaskID() + " Progress/ping thread started");
   }
 
   
@@ -463,7 +453,7 @@
           taskStatus.statusUpdate(taskProgress.get(), taskProgress.toString(), 
                                   counters);
           try {
-            if (!umbilical.statusUpdate(getTaskId(), taskStatus)) {
+            if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
               LOG.warn("Parent died.  Exiting "+taskId);
               System.exit(66);
             }
@@ -496,7 +486,7 @@
           shouldBePromoted = true;
         }
         umbilical.done(taskId, shouldBePromoted);
-        LOG.info("Task '" + getTaskId() + "' done.");
+        LOG.info("Task '" + getTaskID() + "' done.");
         return;
       } catch (IOException ie) {
         LOG.warn("Failure signalling completion: " + 
@@ -556,10 +546,10 @@
         // Delete the temporary task-specific output directory
         if (!fs.delete(taskOutputPath, true)) {
           LOG.info("Failed to delete the temporary output directory of task: " + 
-                  getTaskId() + " - " + taskOutputPath);
+                  getTaskID() + " - " + taskOutputPath);
         }
         
-        LOG.info("Saved output of task '" + getTaskId() + "' to " + jobOutputPath);
+        LOG.info("Saved output of task '" + getTaskID() + "' to " + jobOutputPath);
       }
     }
   }
@@ -580,11 +570,11 @@
       if (!fs.rename(taskOutput, finalOutputPath)) {
         if (!fs.delete(finalOutputPath, true)) {
           throw new IOException("Failed to delete earlier output of task: " + 
-                  getTaskId());
+                  getTaskID());
         }
         if (!fs.rename(taskOutput, finalOutputPath)) {
           throw new IOException("Failed to save output of task: " + 
-                  getTaskId());
+                  getTaskID());
         }
       }
       LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Wed Apr 30 05:25:05 2008
@@ -21,6 +21,7 @@
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -34,7 +35,7 @@
   private int eventId; 
   private String taskTrackerHttp;
   private int taskRunTime; // using int since runtime is the time difference
-  private String taskId;
+  private TaskAttemptID taskId;
   Status status; 
   boolean isMap = false;
   private int idWithinJob;
@@ -55,7 +56,7 @@
    * @param taskTrackerHttp task tracker's host:port for http. 
    */
   public TaskCompletionEvent(int eventId, 
-                             String taskId,
+                             TaskAttemptID taskId,
                              int idWithinJob,
                              boolean isMap,
                              Status status, 
@@ -78,10 +79,21 @@
   /**
    * Returns task id. 
    * @return task id
+   * @deprecated use {@link #getTaskID()} instead.
    */
+  @Deprecated
   public String getTaskId() {
+    return taskId.toString();
+  }
+  
+  /**
+   * Returns task id. 
+   * @return task id
+   */
+  public TaskAttemptID getTaskID() {
     return taskId;
   }
+  
   /**
    * Returns enum Status.SUCESS or Status.FAILURE.
    * @return task tracker status
@@ -123,11 +135,21 @@
   /**
    * Sets task id. 
    * @param taskId
+   * @deprecated use {@link #setTaskID(TaskAttemptID)} instead.
    */
-  public void setTaskId(
-                        String taskId) {
+  @Deprecated
+  public void setTaskId(String taskId) {
+    this.taskId = TaskAttemptID.forName(taskId);
+  }
+  
+  /**
+   * Sets task id. 
+   * @param taskId
+   */
+  public void setTaskID(TaskAttemptID taskId) {
     this.taskId = taskId;
   }
+  
   /**
    * Set task status. 
    * @param status
@@ -145,6 +167,7 @@
     this.taskTrackerHttp = taskTrackerHttp;
   }
     
+  @Override
   public String toString(){
     StringBuffer buf = new StringBuffer(); 
     buf.append("Task Id : "); 
@@ -165,7 +188,7 @@
   // Writable
   //////////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    WritableUtils.writeString(out, taskId); 
+    taskId.write(out); 
     WritableUtils.writeVInt(out, idWithinJob);
     out.writeBoolean(isMap);
     WritableUtils.writeEnum(out, status); 
@@ -174,7 +197,7 @@
   }
   
   public void readFields(DataInput in) throws IOException {
-    this.taskId = WritableUtils.readString(in); 
+    this.taskId = TaskAttemptID.read(in); 
     this.idWithinJob = WritableUtils.readVInt(in);
     this.isMap = in.readBoolean();
     this.status = WritableUtils.readEnum(in, Status.class);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Apr 30 05:25:05 2008
@@ -19,7 +19,6 @@
 
 
 import java.io.IOException;
-import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -29,8 +28,6 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
 
 
@@ -53,13 +50,6 @@
   int maxTaskAttempts = 4;    
   static final double SPECULATIVE_GAP = 0.2;
   static final long SPECULATIVE_LAG = 60 * 1000;
-  static final String MAP_IDENTIFIER = "_m_";
-  static final String REDUCE_IDENTIFIER = "_r_";
-  private static NumberFormat idFormat = NumberFormat.getInstance();
-  static {
-    idFormat.setMinimumIntegerDigits(6);
-    idFormat.setGroupingUsed(false);
-  }
 
   public static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.TaskInProgress");
 
@@ -69,7 +59,7 @@
   private int numMaps;
   private int partition;
   private JobTracker jobtracker;
-  private String id;
+  private TaskID id;
   private JobInProgress job;
 
   // Status of the TIP
@@ -84,36 +74,33 @@
   private int completes = 0;
   private boolean failed = false;
   private boolean killed = false;
-
-  // The 'unique' prefix for taskids of this tip
-  String taskIdPrefix;
-    
+   
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
     
   // The taskid that took this TIP to SUCCESS
-  private String successfulTaskId;
+  private TaskAttemptID successfulTaskId;
   
   // Map from task Id -> TaskTracker Id, contains tasks that are
   // currently runnings
-  private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
+  private TreeMap<TaskAttemptID, String> activeTasks = new TreeMap<TaskAttemptID, String>();
   private JobConf conf;
-  private Map<String,List<String>> taskDiagnosticData =
-    new TreeMap<String,List<String>>();
+  private Map<TaskAttemptID,List<String>> taskDiagnosticData =
+    new TreeMap<TaskAttemptID,List<String>>();
   /**
    * Map from taskId -> TaskStatus
    */
-  private TreeMap<String,TaskStatus> taskStatuses = 
-    new TreeMap<String,TaskStatus>();
+  private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
+    new TreeMap<TaskAttemptID,TaskStatus>();
 
   // Map from taskId -> Task
-  private Map<String, Task> tasks = new TreeMap<String, Task>();
+  private Map<TaskAttemptID, Task> tasks = new TreeMap<TaskAttemptID, Task>();
 
   private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
-  private TreeSet<String> tasksReportedClosed = new TreeSet<String>();
+  private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
   
   //list of tasks to kill, <taskid> -> <shouldFail> 
-  private TreeMap<String, Boolean> tasksToKill = new TreeMap<String, Boolean>();
+  private TreeMap<TaskAttemptID, Boolean> tasksToKill = new TreeMap<TaskAttemptID, Boolean>();
   
   private Counters counters = new Counters();
   
@@ -121,7 +108,7 @@
   /**
    * Constructor for MapTask
    */
-  public TaskInProgress(String jobid, String jobFile, 
+  public TaskInProgress(JobID jobid, String jobFile, 
                         RawSplit rawSplit, 
                         JobTracker jobtracker, JobConf conf, 
                         JobInProgress job, int partition) {
@@ -132,13 +119,13 @@
     this.conf = conf;
     this.partition = partition;
     setMaxTaskAttempts();
-    init(JobTracker.getJobUniqueString(jobid));
+    init(jobid);
   }
         
   /**
    * Constructor for ReduceTask
    */
-  public TaskInProgress(String jobid, String jobFile, 
+  public TaskInProgress(JobID jobid, String jobFile, 
                         int numMaps, 
                         int partition, JobTracker jobtracker, JobConf conf,
                         JobInProgress job) {
@@ -149,7 +136,7 @@
     this.job = job;
     this.conf = conf;
     setMaxTaskAttempts();
-    init(JobTracker.getJobUniqueString(jobid));
+    init(jobid);
   }
   
   /**
@@ -162,39 +149,10 @@
       this.maxTaskAttempts = conf.getMaxReduceAttempts();
     }
   }
-  
-  /**
-   * Return true if the tip id represents a map
-   * @param tipId the tip id
-   * @return whether the tip is a map tip or a reduce tip
-   */
-  public static boolean isMapId(String tipId) {
-    if (tipId.contains(MAP_IDENTIFIER))  {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Make a unique name for this TIP.
-   * @param uniqueBase The unique name of the job
-   * @return The unique string for this tip
-   */
-  private String makeUniqueString(String uniqueBase) {
-    StringBuilder result = new StringBuilder();
-    result.append(uniqueBase);
-    if (isMapTask()) {
-      result.append(MAP_IDENTIFIER);
-    } else {
-      result.append(REDUCE_IDENTIFIER);
-    }
-    result.append(idFormat.format(partition));
-    return result.toString();
-  }
     
   /**
    * Return the index of the tip within the job, so 
-   * "tip_200707121733_1313_0002_m_012345" would return 12345;
+   * "task_200707121733_1313_0002_m_012345" would return 12345;
    * @return int the tip index
    */
   public int idWithinJob() {
@@ -213,10 +171,9 @@
   /**
    * Initialization common to Map and Reduce
    */
-  void init(String jobUniqueString) {
+  void init(JobID jobId) {
     this.startTime = System.currentTimeMillis();
-    this.taskIdPrefix = makeUniqueString(jobUniqueString);
-    this.id = "tip_" + this.taskIdPrefix;
+    this.id = new TaskID(jobId, isMapTask(), partition);
   }
 
   ////////////////////////////////////
@@ -232,7 +189,7 @@
   /**
    * Return an ID for this task, not its component taskid-threads
    */
-  public String getTIPId() {
+  public TaskID getTIPId() {
     return this.id;
   }
   /**
@@ -247,7 +204,7 @@
    * @param taskId
    * @return
    */  
-  public Task getTaskObject(String taskId) {
+  public Task getTask(TaskAttemptID taskId) {
     return tasks.get(taskId);
   }
   
@@ -259,16 +216,16 @@
     return !activeTasks.isEmpty();
   }
     
-  private String getSuccessfulTaskid() {
+  private TaskAttemptID getSuccessfulTaskid() {
     return successfulTaskId;
   }
   
-  private void setSuccessfulTaskid(String successfulTaskId) {
+  private void setSuccessfulTaskid(TaskAttemptID successfulTaskId) {
     this.successfulTaskId = successfulTaskId; 
   }
   
   private void resetSuccessfulTaskid() {
-    this.successfulTaskId = ""; 
+    this.successfulTaskId = null; 
   }
   
   /**
@@ -286,9 +243,9 @@
    * @param taskid taskid of attempt to check for completion
    * @return <code>true</code> if taskid is complete, else <code>false</code>
    */
-  public boolean isComplete(String taskid) {
-    return ((completes > 0) && 
-             getSuccessfulTaskid().equals(taskid));
+  public boolean isComplete(TaskAttemptID taskid) {
+    return ((completes > 0) 
+            && taskid.equals(getSuccessfulTaskid()));
   }
 
   /**
@@ -327,12 +284,13 @@
   public Counters getCounters() {
     return counters;
   }
+
   /**
    * Returns whether a component task-thread should be 
    * closed because the containing JobInProgress has completed
    * or the task is killed by the user
    */
-  public boolean shouldClose(String taskid) {
+  public boolean shouldClose(TaskAttemptID taskid) {
     /**
      * If the task hasn't been closed yet, and it belongs to a completed
      * TaskInProgress close it.
@@ -381,7 +339,7 @@
    * @param taskId the id of the required task
    * @return the list of diagnostics for that task
    */
-  synchronized List<String> getDiagnosticInfo(String taskId) {
+  synchronized List<String> getDiagnosticInfo(TaskAttemptID taskId) {
     return taskDiagnosticData.get(taskId);
   }
     
@@ -396,7 +354,7 @@
    * @param taskId id of the task 
    * @param diagInfo diagnostic information for the task
    */
-  public void addDiagnosticInfo(String taskId, String diagInfo) {
+  public void addDiagnosticInfo(TaskAttemptID taskId, String diagInfo) {
     List<String> diagHistory = taskDiagnosticData.get(taskId);
     if (diagHistory == null) {
       diagHistory = new ArrayList<String>();
@@ -412,7 +370,7 @@
    * @return has the task changed its state noticably?
    */
   synchronized boolean updateStatus(TaskStatus status) {
-    String taskid = status.getTaskId();
+    TaskAttemptID taskid = status.getTaskID();
     String diagInfo = status.getDiagnosticInfo();
     TaskStatus oldStatus = taskStatuses.get(taskid);
     boolean changed = true;
@@ -461,7 +419,7 @@
    * Indicate that one of the taskids in this TaskInProgress
    * has failed.
    */
-  public void incompleteSubTask(String taskid, String trackerName, 
+  public void incompleteSubTask(TaskAttemptID taskid, String trackerName, 
                                 JobStatus jobStatus) {
     //
     // Note the failure and its location
@@ -529,7 +487,7 @@
    * @param taskId id of the completed task-attempt
    * @param finalTaskState final {@link TaskStatus.State} of the task-attempt
    */
-  private void completedTask(String taskId, TaskStatus.State finalTaskState) {
+  private void completedTask(TaskAttemptID taskId, TaskStatus.State finalTaskState) {
     TaskStatus status = taskStatuses.get(taskId);
     status.setRunState(finalTaskState);
     activeTasks.remove(taskId);
@@ -540,7 +498,7 @@
    * TaskInProgress has successfully completed; hence we mark this
    * taskid as {@link TaskStatus.State.KILLED}. 
    */
-  void alreadyCompletedTask(String taskid) {
+  void alreadyCompletedTask(TaskAttemptID taskid) {
     // 'KILL' the task 
     completedTask(taskid, TaskStatus.State.KILLED);
     
@@ -555,7 +513,7 @@
    * Indicate that one of the taskids in this TaskInProgress
    * has successfully completed!
    */
-  public void completed(String taskid) {
+  public void completed(TaskAttemptID taskid) {
     //
     // Record that this taskid is complete
     //
@@ -594,7 +552,7 @@
    * @param taskid
    * @return
    */
-  public TaskStatus getTaskStatus(String taskid) {
+  public TaskStatus getTaskStatus(TaskAttemptID taskid) {
     return taskStatuses.get(taskid);
   }
   /**
@@ -620,7 +578,7 @@
   /**
    * Kill the given task
    */
-  boolean killTask(String taskId, boolean shouldFail) {
+  boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
     TaskStatus st = taskStatuses.get(taskId);
     if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
         || st.getRunState() == TaskStatus.State.COMMIT_PENDING)
@@ -651,8 +609,8 @@
       double bestProgress = 0;
       String bestState = "";
       Counters bestCounters = new Counters();
-      for (Iterator<String> it = taskStatuses.keySet().iterator(); it.hasNext();) {
-        String taskid = it.next();
+      for (Iterator<TaskAttemptID> it = taskStatuses.keySet().iterator(); it.hasNext();) {
+        TaskAttemptID taskid = it.next();
         TaskStatus status = taskStatuses.get(taskid);
         if (status.getRunState() == TaskStatus.State.SUCCEEDED) {
           bestProgress = 1;
@@ -723,9 +681,9 @@
     }
 
     // Create the 'taskid'; do not count the 'killed' tasks against the job!
-    String taskid = null;
+    TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
-      taskid = "task_" + taskIdPrefix + "_" + nextTaskId;
+      taskid = new TaskAttemptID( id, nextTaskId);
       ++nextTaskId;
     } else {
       LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + maxTaskAttempts) +
@@ -733,14 +691,12 @@
               " attempts for the tip '" + getTIPId() + "'");
       return null;
     }
-        
-    String jobId = job.getProfile().getJobId();
 
     if (isMapTask()) {
-      t = new MapTask(jobId, jobFile, this.id, taskid, partition, 
-                      rawSplit.getClassName(), rawSplit.getBytes());
+      t = new MapTask(jobFile, taskid, partition, 
+          rawSplit.getClassName(), rawSplit.getBytes());
     } else {
-      t = new ReduceTask(jobId, jobFile, this.id, taskid, partition, numMaps);
+      t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
     t.setConf(conf);
     tasks.put(taskid, t);
@@ -799,11 +755,4 @@
   public int getSuccessEventNumber() {
     return successEventNumber;
   }
-  
-  /**
-   * Gets the tip id for the given taskid
-   * */
-  public static String getTipId(String taskId){
-	  return taskId.substring(0, taskId.lastIndexOf('_')).replace("task", "tip");
-  }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java Wed Apr 30 05:25:05 2008
@@ -18,11 +18,16 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.*;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileUtil;
 
 /**
@@ -44,8 +49,8 @@
     }
   }
 
-  public static File getTaskLogFile(String taskid, LogName filter) {
-    return new File(new File(LOG_DIR, taskid), filter.toString());
+  public static File getTaskLogFile(TaskAttemptID taskid, LogName filter) {
+    return new File(new File(LOG_DIR, taskid.toString()), filter.toString());
   }
   
   /**
@@ -73,6 +78,7 @@
       this.prefix = prefix;
     }
     
+    @Override
     public String toString() {
       return prefix;
     }
@@ -124,7 +130,7 @@
      * @param end the offset to read upto (negative is relative to tail)
      * @throws IOException
      */
-    public Reader(String taskid, LogName kind, 
+    public Reader(TaskAttemptID taskid, LogName kind, 
                   long start, long end) throws IOException {
       // find the right log file
       File filename = getTaskLogFile(taskid, kind);
@@ -152,6 +158,7 @@
       }
     }
     
+    @Override
     public int read() throws IOException {
       int result = -1;
       if (bytesRemaining > 0) {
@@ -161,6 +168,7 @@
       return result;
     }
     
+    @Override
     public int read(byte[] buffer, int offset, int length) throws IOException {
       length = (int) Math.min(length, bytesRemaining);
       int bytes = file.read(buffer, offset, length);
@@ -170,10 +178,12 @@
       return bytes;
     }
     
+    @Override
     public int available() throws IOException {
       return (int) Math.min(bytesRemaining, file.available());
     }
 
+    @Override
     public void close() throws IOException {
       file.close();
     }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java Wed Apr 30 05:25:05 2008
@@ -30,7 +30,8 @@
  * 
  */
 public class TaskLogAppender extends FileAppender {
-  private String taskId;
+  private String taskId; //taskId should be managed as String rather than TaskID object
+  //so that log4j can configure it from the configuration(log4j.properties). 
   private int maxEvents;
   private Queue<LoggingEvent> tail = null;
 
@@ -40,7 +41,7 @@
       if (maxEvents > 0) {
         tail = new LinkedList<LoggingEvent>();
       }
-      setFile(TaskLog.getTaskLogFile(taskId, 
+      setFile(TaskLog.getTaskLogFile(TaskAttemptID.forName(taskId), 
                                      TaskLog.LogName.SYSLOG).toString());
       setAppend(true);
       super.activateOptions();

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java Wed Apr 30 05:25:05 2008
@@ -18,12 +18,14 @@
 package org.apache.hadoop.mapred;
 
 import java.io.File;
-import java.io.InputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 
 import javax.servlet.ServletException;
-import javax.servlet.http.*;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.util.StringUtils;
 
@@ -31,8 +33,9 @@
  * A servlet that is run by the TaskTrackers to provide the task logs via http.
  */
 public class TaskLogServlet extends HttpServlet {
+  private static final long serialVersionUID = -6615764817774487321L;
   
-  private boolean haveTaskLog(String taskId, TaskLog.LogName type) {
+  private boolean haveTaskLog(TaskAttemptID taskId, TaskLog.LogName type) {
     File f = TaskLog.getTaskLogFile(taskId, type);
     return f.canRead();
   }
@@ -86,7 +89,7 @@
   }
 
   private void printTaskLog(HttpServletResponse response,
-                            OutputStream out, String taskId, 
+                            OutputStream out, TaskAttemptID taskId, 
                             long start, long end, boolean plainText, 
                             TaskLog.LogName filter) throws IOException {
     if (!plainText) {
@@ -135,6 +138,7 @@
   /**
    * Get the logs via http.
    */
+  @Override
   public void doGet(HttpServletRequest request, 
                     HttpServletResponse response
                     ) throws ServletException, IOException {
@@ -143,12 +147,13 @@
     boolean plainText = false;
     TaskLog.LogName filter = null;
 
-    String taskId = request.getParameter("taskid");
-    if (taskId == null) {
+    String taskIdStr = request.getParameter("taskid");
+    if (taskIdStr == null) {
       response.sendError(HttpServletResponse.SC_BAD_REQUEST, 
                          "Argument taskid is required");
       return;
     }
+    TaskAttemptID taskId = TaskAttemptID.forName(taskIdStr);
     String logFilter = request.getParameter("filter");
     if (logFilter != null) {
       try {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java Wed Apr 30 05:25:05 2008
@@ -27,7 +27,7 @@
 
 /** A report on the state of a task. */
 public class TaskReport implements Writable {
-  private String taskid;
+  private TaskID taskid;
   private float progress;
   private String state;
   private String[] diagnostics;
@@ -37,7 +37,7 @@
 
   public TaskReport() {}
 
-  TaskReport(String taskid, float progress, String state,
+  TaskReport(TaskID taskid, float progress, String state,
              String[] diagnostics, long startTime, long finishTime,
              Counters counters) {
     this.taskid = taskid;
@@ -49,8 +49,11 @@
     this.counters = counters;
   }
     
+  /** @deprecated use {@link #getTaskID()} instead */
+  @Deprecated
+  public String getTaskId() { return taskid.toString(); }
   /** The id of the task. */
-  public String getTaskId() { return taskid; }
+  public TaskID getTaskID() { return taskid; }
   /** The amount completed, between zero and one. */
   public float getProgress() { return progress; }
   /** The most recent state, reported by a {@link Reporter}. */
@@ -94,7 +97,7 @@
   // Writable
   //////////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    Text.writeString(out, taskid);
+    taskid.write(out);
     out.writeFloat(progress);
     Text.writeString(out, state);
     out.writeLong(startTime);
@@ -104,7 +107,7 @@
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.taskid = Text.readString(in);
+    this.taskid = TaskID.read(in);
     this.progress = in.readFloat();
     this.state = Text.readString(in);
     this.startTime = in.readLong(); 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Apr 30 05:25:05 2008
@@ -56,7 +56,7 @@
     this.t = t;
     this.tracker = tracker;
     this.conf = conf;
-    this.mapOutputFile = new MapOutputFile(t.getJobId());
+    this.mapOutputFile = new MapOutputFile(t.getJobID());
     this.mapOutputFile.setConf(conf);
   }
 
@@ -88,12 +88,13 @@
     return str.toString();
   }
   
+  @Override
   public final void run() {
     try {
       
       //before preparing the job localize 
       //all the archives
-      String taskid = t.getTaskId();
+      TaskAttemptID taskid = t.getTaskID();
       LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
       File jobCacheDir = null;
       if (conf.getJar() != null) {
@@ -102,8 +103,8 @@
       }
       File workDir = new File(lDirAlloc.getLocalPathToRead(
                                 TaskTracker.getJobCacheSubdir() 
-                                + Path.SEPARATOR + t.getJobId() 
-                                + Path.SEPARATOR + t.getTaskId()
+                                + Path.SEPARATOR + t.getJobID() 
+                                + Path.SEPARATOR + t.getTaskID()
                                 + Path.SEPARATOR + "work",
                                 conf). toString());
 
@@ -293,7 +294,7 @@
       //  </property>
       //
       String javaOpts = conf.get("mapred.child.java.opts", "-Xmx200m");
-      javaOpts = replaceAll(javaOpts, "@taskid@", taskid);
+      javaOpts = replaceAll(javaOpts, "@taskid@", taskid.toString());
       String [] javaOptsSplit = javaOpts.split(" ");
       
       // Add java.library.path; necessary for loading native libraries.
@@ -368,7 +369,7 @@
       InetSocketAddress address = tracker.getTaskTrackerReportAddress();
       vargs.add(address.getAddress().getHostAddress()); 
       vargs.add(Integer.toString(address.getPort())); 
-      vargs.add(taskid);                      // pass task identifier
+      vargs.add(taskid.toString());                      // pass task identifier
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -393,18 +394,18 @@
     } catch (FSError e) {
       LOG.fatal("FSError", e);
       try {
-        tracker.fsError(t.getTaskId(), e.getMessage());
+        tracker.fsError(t.getTaskID(), e.getMessage());
       } catch (IOException ie) {
-        LOG.fatal(t.getTaskId()+" reporting FSError", ie);
+        LOG.fatal(t.getTaskID()+" reporting FSError", ie);
       }
     } catch (Throwable throwable) {
-      LOG.warn(t.getTaskId()+" Child Error", throwable);
+      LOG.warn(t.getTaskID()+" Child Error", throwable);
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       throwable.printStackTrace(new PrintStream(baos));
       try {
-        tracker.reportDiagnosticInfo(t.getTaskId(), baos.toString());
+        tracker.reportDiagnosticInfo(t.getTaskID(), baos.toString());
       } catch (IOException e) {
-        LOG.warn(t.getTaskId()+" Reporting Diagnostics", e);
+        LOG.warn(t.getTaskID()+" Reporting Diagnostics", e);
       }
     } finally {
       try{
@@ -423,7 +424,7 @@
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tracker.reportTaskFinished(t.getTaskId());
+      tracker.reportTaskFinished(t.getTaskID());
     }
   }
 
@@ -461,7 +462,7 @@
    * Run the child process
    */
   private void runChild(List<String> args, File dir,
-                        String taskid) throws IOException {
+                        TaskAttemptID taskid) throws IOException {
 
     try {
       shexec = new ShellCommandExecutor(args.toArray(new String[0]), dir);

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Wed Apr 30 05:25:05 2008
@@ -24,7 +24,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 /**************************************************
@@ -43,7 +43,7 @@
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
                             COMMIT_PENDING}
     
-  private String taskid;
+  private TaskAttemptID taskid;
   private float progress;
   private State runState;
   private String diagnosticInfo;
@@ -59,7 +59,7 @@
 
   public TaskStatus() {}
 
-  public TaskStatus(String taskid, float progress,
+  public TaskStatus(TaskAttemptID taskid, float progress,
                     State runState, String diagnosticInfo,
                     String stateString, String taskTracker,
                     Phase phase, Counters counters) {
@@ -74,7 +74,7 @@
     this.includeCounters = true;
   }
   
-  public String getTaskId() { return taskid; }
+  public TaskAttemptID getTaskID() { return taskid; }
   public abstract boolean getIsMap();
   public float getProgress() { return progress; }
   public void setProgress(float progress) { this.progress = progress; } 
@@ -211,7 +211,7 @@
    * 
    * @return the list of maps from which output-fetches failed.
    */
-  public List<String> getFetchFailedMaps() {
+  public List<TaskAttemptID> getFetchFailedMaps() {
     return null;
   }
   
@@ -220,7 +220,7 @@
    *  
    * @param mapTaskId map from which fetch failed
    */
-  synchronized void addFetchFailedMap(String mapTaskId) {}
+  synchronized void addFetchFailedMap(TaskAttemptID mapTaskId) {}
 
   /**
    * Update the status of the task.
@@ -271,6 +271,7 @@
     diagnosticInfo = "";
   }
 
+  @Override
   public Object clone() {
     try {
       return super.clone();
@@ -284,11 +285,11 @@
   // Writable
   //////////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, taskid);
+    taskid.write(out);
     out.writeFloat(progress);
     WritableUtils.writeEnum(out, runState);
-    UTF8.writeString(out, diagnosticInfo);
-    UTF8.writeString(out, stateString);
+    Text.writeString(out, diagnosticInfo);
+    Text.writeString(out, stateString);
     WritableUtils.writeEnum(out, phase);
     out.writeLong(startTime);
     out.writeLong(finishTime);
@@ -299,11 +300,11 @@
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.taskid = UTF8.readString(in);
+    this.taskid = TaskAttemptID.read(in);
     this.progress = in.readFloat();
     this.runState = WritableUtils.readEnum(in, State.class);
-    this.diagnosticInfo = UTF8.readString(in);
-    this.stateString = UTF8.readString(in);
+    this.diagnosticInfo = Text.readString(in);
+    this.stateString = Text.readString(in);
     this.phase = WritableUtils.readEnum(in, Phase.class); 
     this.startTime = in.readLong(); 
     this.finishTime = in.readLong(); 
@@ -318,7 +319,7 @@
   // Factory-like methods to create/read/write appropriate TaskStatus objects
   //////////////////////////////////////////////////////////////////////////////
   
-  static TaskStatus createTaskStatus(DataInput in, String taskId, float progress,
+  static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, float progress,
                                      State runState, String diagnosticInfo,
                                      String stateString, String taskTracker,
                                      Phase phase, Counters counters) 
@@ -328,7 +329,7 @@
                           stateString, taskTracker, phase, counters);
   }
   
-  static TaskStatus createTaskStatus(boolean isMap, String taskId, float progress,
+  static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, float progress,
                                    State runState, String diagnosticInfo,
                                    String stateString, String taskTracker,
                                    Phase phase, Counters counters) { 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Apr 30 05:25:05 2008
@@ -26,7 +26,7 @@
 import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
 import java.net.URI;
-import java.net.URISyntaxException; 
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -36,10 +36,10 @@
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.Vector;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Pattern;
-import java.util.Vector;
 
 import javax.servlet.ServletContext;
 import javax.servlet.ServletException;
@@ -55,10 +55,9 @@
 import org.apache.hadoop.fs.FSError;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -77,6 +76,7 @@
 import org.apache.hadoop.util.RunJar;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 import org.apache.log4j.LogManager;
 
 /*******************************************************
@@ -124,12 +124,12 @@
     
   boolean shuttingDown = false;
     
-  Map<String, TaskInProgress> tasks = new HashMap<String, TaskInProgress>();
+  Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
   /**
    * Map from taskId -> TaskInProgress.
    */
-  Map<String, TaskInProgress> runningTasks = null;
-  Map<String, RunningJob> runningJobs = null;
+  Map<TaskAttemptID, TaskInProgress> runningTasks = null;
+  Map<JobID, RunningJob> runningJobs = null;
   volatile int mapTotal = 0;
   volatile int reduceTotal = 0;
   boolean justStarted = true;
@@ -303,10 +303,10 @@
                 TaskInProgress tip;
                 KillTaskAction killAction = (KillTaskAction) action;
                 synchronized (TaskTracker.this) {
-                  tip = tasks.get(killAction.getTaskId());
+                  tip = tasks.get(killAction.getTaskID());
                 }
                 LOG.info("Received KillTaskAction for task: " + 
-                         killAction.getTaskId());
+                         killAction.getTaskID());
                 purgeTask(tip, false);
               } else {
                 LOG.error("Non-delete action given to cleanup thread: "
@@ -323,7 +323,7 @@
     taskCleanupThread.start();
   }
     
-  private RunningJob addTaskToJob(String jobId, 
+  private RunningJob addTaskToJob(JobID jobId, 
                                   Path localJobFile,
                                   TaskInProgress tip) {
     synchronized (runningJobs) {
@@ -345,7 +345,7 @@
     }
   }
 
-  private void removeTaskFromJob(String jobId, TaskInProgress tip) {
+  private void removeTaskFromJob(JobID jobId, TaskInProgress tip) {
     synchronized (runningJobs) {
       RunningJob rjob = runningJobs.get(jobId);
       if (rjob == null) {
@@ -403,8 +403,8 @@
 
     // Clear out state tables
     this.tasks.clear();
-    this.runningTasks = new TreeMap<String, TaskInProgress>();
-    this.runningJobs = new TreeMap<String, RunningJob>();
+    this.runningTasks = new TreeMap<TaskAttemptID, TaskInProgress>();
+    this.runningJobs = new TreeMap<JobID, RunningJob>();
     this.mapTotal = 0;
     this.reduceTotal = 0;
     this.acceptNewTasks = true;
@@ -480,9 +480,9 @@
 
     private List <FetchStatus> reducesInShuffle() {
       List <FetchStatus> fList = new ArrayList<FetchStatus>();
-      for (Map.Entry <String, RunningJob> item : runningJobs.entrySet()) {
+      for (Map.Entry <JobID, RunningJob> item : runningJobs.entrySet()) {
         RunningJob rjob = item.getValue();
-        String jobId = item.getKey();
+        JobID jobId = item.getKey();
         FetchStatus f;
         synchronized (rjob) {
           f = rjob.getFetchStatus();
@@ -511,6 +511,7 @@
       return fList;
     }
       
+    @Override
     public void run() {
       LOG.info("Starting thread: " + getName());
         
@@ -577,9 +578,9 @@
     /** This is the cache of map events for a given job */ 
     private List<TaskCompletionEvent> allMapEvents;
     /** What jobid this fetchstatus object is for*/
-    private String jobId;
+    private JobID jobId;
      
-    public FetchStatus(String jobId, int numMaps) {
+    public FetchStatus(JobID jobId, int numMaps) {
       this.fromEventId = new IntWritable(0);
       this.jobId = jobId;
       this.allMapEvents = new ArrayList<TaskCompletionEvent>(numMaps);
@@ -625,7 +626,8 @@
   private void localizeJob(TaskInProgress tip) throws IOException {
     Path localJarFile = null;
     Task t = tip.getTask();
-    String jobId = t.getJobId();
+    
+    JobID jobId = t.getJobID();
     String jobFile = t.getJobFile();
     // Get sizes of JobFile and JarFile
     // sizes are -1 if they are not present.
@@ -758,8 +760,8 @@
     // Kill running tasks.  Do this in a 2nd vector, called 'tasksToClose',
     // because calling jobHasFinished() may result in an edit to 'tasks'.
     //
-    TreeMap<String, TaskInProgress> tasksToClose =
-      new TreeMap<String, TaskInProgress>();
+    TreeMap<TaskAttemptID, TaskInProgress> tasksToClose =
+      new TreeMap<TaskAttemptID, TaskInProgress>();
     tasksToClose.putAll(tasks);
     for (TaskInProgress tip : tasksToClose.values()) {
       tip.jobHasFinished(false);
@@ -770,6 +772,7 @@
     // time to shutdown.  (They need to wait a full
     // RPC timeout, which might be 10-30 seconds.)
     new Thread("RPC shutdown") {
+      @Override
       public void run() {
         if (taskReportServer != null) {
           taskReportServer.stop();
@@ -851,7 +854,7 @@
    * @throws IOException
    */  
   private List<TaskCompletionEvent> queryJobTracker(IntWritable fromEventId,
-                                                    String jobId,
+                                                    JobID jobId,
                                                     InterTrackerProtocol jobClient)
     throws IOException {
 
@@ -1033,7 +1036,7 @@
           } catch (MetricsException me) {
             LOG.warn("Caught: " + StringUtils.stringifyException(me));
           }
-          runningTasks.remove(taskStatus.getTaskId());
+          runningTasks.remove(taskStatus.getTaskID());
         }
       }
       
@@ -1089,9 +1092,9 @@
         long timeSinceLastReport = now - tip.getLastProgressReport();
         if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
           String msg = 
-            "Task " + tip.getTask().getTaskId() + " failed to report status for " 
+            "Task " + tip.getTask().getTaskID() + " failed to report status for " 
             + (timeSinceLastReport / 1000) + " seconds. Killing!";
-          LOG.info(tip.getTask().getTaskId() + ": " + msg);
+          LOG.info(tip.getTask().getTaskID() + ": " + msg);
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           tip.reportDiagnosticInfo(msg);
           myMetrics.timedoutTask();
@@ -1107,7 +1110,7 @@
    * @throws IOException
    */
   private synchronized void purgeJob(KillJobAction action) throws IOException {
-    String jobId = action.getJobId();
+    JobID jobId = action.getJobID();
     LOG.info("Received 'KillJobAction' for job: " + jobId);
     RunningJob rjob = null;
     synchronized (runningJobs) {
@@ -1126,7 +1129,7 @@
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
           fConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + JOBCACHE + 
-                                 Path.SEPARATOR +  rjob.getJobId());
+                                 Path.SEPARATOR +  rjob.getJobID());
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -1148,11 +1151,11 @@
   private void purgeTask(TaskInProgress tip, boolean wasFailure) 
   throws IOException {
     if (tip != null) {
-      LOG.info("About to purge task: " + tip.getTask().getTaskId());
+      LOG.info("About to purge task: " + tip.getTask().getTaskID());
         
       // Remove the task from running jobs, 
       // removing the job if it's the last task
-      removeTaskFromJob(tip.getTask().getJobId(), tip);
+      removeTaskFromJob(tip.getTask().getJobID(), tip);
       tip.jobHasFinished(wasFailure);
     }
   }
@@ -1179,7 +1182,7 @@
         if (killMe!=null) {
           String msg = "Tasktracker running out of space." +
             " Killing task.";
-          LOG.info(killMe.getTask().getTaskId() + ": " + msg);
+          LOG.info(killMe.getTask().getTaskID() + ": " + msg);
           killMe.reportDiagnosticInfo(msg);
           purgeTask(killMe, false);
         }
@@ -1258,11 +1261,11 @@
    */
   private void startNewTask(LaunchTaskAction action) {
     Task t = action.getTask();
-    LOG.info("LaunchTaskAction: " + t.getTaskId());
+    LOG.info("LaunchTaskAction: " + t.getTaskID());
     TaskInProgress tip = new TaskInProgress(t, this.fConf);
     synchronized (this) {
-      tasks.put(t.getTaskId(), tip);
-      runningTasks.put(t.getTaskId(), tip);
+      tasks.put(t.getTaskID(), tip);
+      runningTasks.put(t.getTaskID(), tip);
       boolean isMap = t.isMapTask();
       if (isMap) {
         mapTotal++;
@@ -1273,14 +1276,14 @@
     try {
       localizeJob(tip);
     } catch (Throwable e) {
-      String msg = ("Error initializing " + tip.getTask().getTaskId() + 
+      String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
       LOG.warn(msg);
       tip.reportDiagnosticInfo(msg);
       try {
         tip.kill(true);
       } catch (IOException ie2) {
-        LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" +
+        LOG.info("Error cleaning up " + tip.getTask().getTaskID() + ":\n" +
                  StringUtils.stringifyException(ie2));          
       }
         
@@ -1369,7 +1372,7 @@
       this.lastProgressReport = System.currentTimeMillis();
       this.defaultJobConf = conf;
       localJobConf = null;
-      taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskId(), 
+      taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
                                                TaskStatus.State.UNASSIGNED, 
                                                diagnosticInfo.toString(), 
@@ -1382,10 +1385,12 @@
     }
         
     private void localizeTask(Task task) throws IOException{
+
       Path localTaskDir = 
         lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + 
-                    Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
-                    task.getTaskId()), defaultJobConf );
+                    Path.SEPARATOR + task.getJobID() + Path.SEPARATOR +
+                    task.getTaskID()), defaultJobConf );
+      
       FileSystem localFs = FileSystem.getLocal(fConf);
       if (!localFs.mkdirs(localTaskDir)) {
         throw new IOException("Mkdirs failed to create " 
@@ -1395,7 +1400,7 @@
       // create symlink for ../work if it already doesnt exist
       String workDir = lDirAlloc.getLocalPathToRead(
                          TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobId() 
+                         + Path.SEPARATOR + task.getJobID() 
                          + Path.SEPARATOR  
                          + "work", defaultJobConf).toString();
       String link = localTaskDir.getParent().toString() 
@@ -1407,8 +1412,8 @@
       // create the working-directory of the task 
       Path cwd = lDirAlloc.getLocalPathForWrite(
                          TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobId() 
-                         + Path.SEPARATOR + task.getTaskId()
+                         + Path.SEPARATOR + task.getJobID() 
+                         + Path.SEPARATOR + task.getTaskID()
                          + Path.SEPARATOR + "work",
                          defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
@@ -1421,7 +1426,7 @@
       localJobConf.set("mapred.local.dir",
                        fConf.get("mapred.local.dir"));
             
-      localJobConf.set("mapred.task.id", task.getTaskId());
+      localJobConf.set("mapred.task.id", task.getTaskID().toString());
       keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
 
       // create _taskid directory in output path temporary directory.
@@ -1430,7 +1435,7 @@
         Path jobTmpDir = new Path(outputPath, MRConstants.TEMP_DIR_NAME);
         FileSystem fs = jobTmpDir.getFileSystem(localJobConf);
         if (fs.exists(jobTmpDir)) {
-          Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskId());
+          Path taskTmpDir = new Path(jobTmpDir, "_" + task.getTaskID());
           if (!fs.mkdirs(taskTmpDir)) {
             throw new IOException("Mkdirs failed to create " 
                                  + taskTmpDir.toString());
@@ -1465,7 +1470,7 @@
       String keepPattern = localJobConf.getKeepTaskFilesPattern();
       if (keepPattern != null) {
         alwaysKeepTaskFiles = 
-          Pattern.matches(keepPattern, task.getTaskId());
+          Pattern.matches(keepPattern, task.getTaskID().toString());
       } else {
         alwaysKeepTaskFiles = false;
       }
@@ -1520,7 +1525,7 @@
      */
     public synchronized void reportProgress(TaskStatus taskStatus) 
     {
-      LOG.info(task.getTaskId() + " " + taskStatus.getProgress() + 
+      LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + 
           "% " + taskStatus.getStateString());
       
       if (this.done || 
@@ -1528,7 +1533,7 @@
         //make sure we ignore progress messages after a task has 
         //invoked TaskUmbilicalProtocol.done() or if the task has been
         //KILLED/FAILED
-        LOG.info(task.getTaskId() + " Ignoring status-update since " +
+        LOG.info(task.getTaskID() + " Ignoring status-update since " +
                  ((this.done) ? "task is 'done'" : 
                                 ("runState: " + this.taskStatus.getRunState()))
                  ); 
@@ -1583,7 +1588,7 @@
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
       
-      LOG.info("Task " + task.getTaskId() + " is done.");
+      LOG.info("Task " + task.getTaskID() + " is done.");
     }
 
     /**
@@ -1628,13 +1633,13 @@
               try {
                 // get task's stdout file 
                 taskStdout = FileUtil.makeShellPath(TaskLog.getTaskLogFile
-                                  (task.getTaskId(), TaskLog.LogName.STDOUT));
+                                  (task.getTaskID(), TaskLog.LogName.STDOUT));
                 // get task's stderr file 
                 taskStderr = FileUtil.makeShellPath(TaskLog.getTaskLogFile
-                                  (task.getTaskId(), TaskLog.LogName.STDERR));
+                                  (task.getTaskID(), TaskLog.LogName.STDERR));
                 // get task's syslog file 
                 taskSyslog = FileUtil.makeShellPath(TaskLog.getTaskLogFile
-                                  (task.getTaskId(), TaskLog.LogName.SYSLOG));
+                                  (task.getTaskID(), TaskLog.LogName.SYSLOG));
               } catch(IOException e){
                 LOG.warn("Exception finding task's stdout/err/syslog files");
               }
@@ -1642,17 +1647,17 @@
               try {
                 workDir = new File(lDirAlloc.getLocalPathToRead(
                                      TaskTracker.getJobCacheSubdir() 
-                                     + Path.SEPARATOR + task.getJobId() 
-                                     + Path.SEPARATOR + task.getTaskId()
+                                     + Path.SEPARATOR + task.getJobID() 
+                                     + Path.SEPARATOR + task.getTaskID()
                                      + Path.SEPARATOR + "work",
                                      localJobConf). toString());
               } catch (IOException e) {
-                LOG.warn("Working Directory of the task " + task.getTaskId() +
+                LOG.warn("Working Directory of the task " + task.getTaskID() +
                 		 "doesnt exist. Throws expetion " +
                           StringUtils.stringifyException(e));
               }
               // Build the command  
-              File stdout = TaskLog.getTaskLogFile(task.getTaskId(),
+              File stdout = TaskLog.getTaskLogFile(task.getTaskID(),
                                                    TaskLog.LogName.DEBUGOUT);
               // add pipes program as argument if it exists.
               String program ="";
@@ -1850,15 +1855,15 @@
       if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING || 
           taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) {
         // change status to failure
-        LOG.info("Reporting output lost:"+task.getTaskId());
+        LOG.info("Reporting output lost:"+task.getTaskID());
         taskStatus.setRunState(TaskStatus.State.FAILED);
         taskStatus.setProgress(0.0f);
         reportDiagnosticInfo("Map output lost, rescheduling: " + 
                              failure);
-        runningTasks.put(task.getTaskId(), this);
+        runningTasks.put(task.getTaskID(), this);
         mapTotal++;
       } else {
-        LOG.warn("Output already reported lost:"+task.getTaskId());
+        LOG.warn("Output already reported lost:"+task.getTaskID());
       }
     }
 
@@ -1871,7 +1876,7 @@
      * by locking tasktracker first and then locks the tip.
      */
     void cleanup() throws IOException {
-      String taskId = task.getTaskId();
+      TaskAttemptID taskId = task.getTaskID();
       LOG.debug("Cleaning up " + taskId);
       synchronized (TaskTracker.this) {
         tasks.remove(taskId);
@@ -1890,7 +1895,7 @@
           }
           defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR + 
                                           JOBCACHE + Path.SEPARATOR + 
-                                          task.getJobId() + 
+                                          task.getJobID() + 
                                           Path.SEPARATOR + taskId);
         } catch (Throwable ie) {
           LOG.info("Error cleaning up task runner: " + 
@@ -1899,14 +1904,16 @@
       }
     }
         
+    @Override
     public boolean equals(Object obj) {
       return (obj instanceof TaskInProgress) &&
-        task.getTaskId().equals
-        (((TaskInProgress) obj).getTask().getTaskId());
+        task.getTaskID().equals
+        (((TaskInProgress) obj).getTask().getTaskID());
     }
         
+    @Override
     public int hashCode() {
-      return task.getTaskId().hashCode();
+      return task.getTaskID().hashCode();
     }
   }
 
@@ -1917,7 +1924,7 @@
   /**
    * Called upon startup by the child process, to fetch Task data.
    */
-  public synchronized Task getTask(String taskid) throws IOException {
+  public synchronized Task getTask(TaskAttemptID taskid) throws IOException {
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
       return tip.getTask();
@@ -1929,7 +1936,7 @@
   /**
    * Called periodically to report Task progress, from 0.0 to 1.0.
    */
-  public synchronized boolean statusUpdate(String taskid, 
+  public synchronized boolean statusUpdate(TaskAttemptID taskid, 
                                               TaskStatus taskStatus) 
   throws IOException {
     TaskInProgress tip = tasks.get(taskid);
@@ -1946,7 +1953,7 @@
    * Called when the task dies before completion, and we want to report back
    * diagnostic info
    */
-  public synchronized void reportDiagnosticInfo(String taskid, String info) throws IOException {
+  public synchronized void reportDiagnosticInfo(TaskAttemptID taskid, String info) throws IOException {
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
       tip.reportDiagnosticInfo(info);
@@ -1956,14 +1963,14 @@
   }
 
   /** Child checking to see if we're alive.  Normally does nothing.*/
-  public synchronized boolean ping(String taskid) throws IOException {
+  public synchronized boolean ping(TaskAttemptID taskid) throws IOException {
     return tasks.get(taskid) != null;
   }
 
   /**
    * The task is done.
    */
-  public synchronized void done(String taskid, boolean shouldPromote) 
+  public synchronized void done(TaskAttemptID taskid, boolean shouldPromote) 
   throws IOException {
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
@@ -1977,7 +1984,7 @@
   /** 
    * A reduce-task failed to shuffle the map-outputs. Kill the task.
    */  
-  public synchronized void shuffleError(String taskId, String message) 
+  public synchronized void shuffleError(TaskAttemptID taskId, String message) 
   throws IOException { 
     LOG.fatal("Task: " + taskId + " - Killed due to Shuffle Failure: " + message);
     TaskInProgress tip = runningTasks.get(taskId);
@@ -1988,7 +1995,7 @@
   /** 
    * A child task had a local filesystem error. Kill the task.
    */  
-  public synchronized void fsError(String taskId, String message) 
+  public synchronized void fsError(TaskAttemptID taskId, String message) 
   throws IOException {
     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
     TaskInProgress tip = runningTasks.get(taskId);
@@ -1996,8 +2003,8 @@
     purgeTask(tip, true);
   }
 
-  public TaskCompletionEvent[] getMapCompletionEvents(
-                                                      String jobId, int fromEventId, int maxLocs) throws IOException {
+  public TaskCompletionEvent[] getMapCompletionEvents(JobID jobId
+      , int fromEventId, int maxLocs) throws IOException {
       
     TaskCompletionEvent[]mapEvents = TaskCompletionEvent.EMPTY_ARRAY;
     RunningJob rjob;
@@ -2021,7 +2028,7 @@
   /**
    * The task is no longer running.  It may not have completed successfully
    */
-  void reportTaskFinished(String taskid) {
+  void reportTaskFinished(TaskAttemptID taskid) {
     TaskInProgress tip;
     synchronized (this) {
       tip = tasks.get(taskid);
@@ -2040,7 +2047,7 @@
   /**
    * A completed map task's output has been lost.
    */
-  public synchronized void mapOutputLost(String taskid,
+  public synchronized void mapOutputLost(TaskAttemptID taskid,
                                          String errorMsg) throws IOException {
     TaskInProgress tip = tasks.get(taskid);
     if (tip != null) {
@@ -2054,14 +2061,14 @@
    *  The datastructure for initializing a job
    */
   static class RunningJob{
-    private String jobid; 
+    private JobID jobid; 
     private Path jobFile;
     // keep this for later use
     Set<TaskInProgress> tasks;
     boolean localized;
     boolean keepJobFiles;
     FetchStatus f;
-    RunningJob(String jobid, Path jobFile) {
+    RunningJob(JobID jobid, Path jobFile) {
       this.jobid = jobid;
       localized = false;
       tasks = new HashSet<TaskInProgress>();
@@ -2073,7 +2080,7 @@
       return jobFile;
     }
       
-    String getJobId() {
+    JobID getJobID() {
       return jobid;
     }
       
@@ -2099,7 +2106,7 @@
       String host = args[0];
       int port = Integer.parseInt(args[1]);
       InetSocketAddress address = new InetSocketAddress(host, port);
-      String taskid = args[2];
+      TaskAttemptID taskid = TaskAttemptID.forName(args[2]);
       //set a very high idle timeout so that the connection is never closed
       defaultConf.setInt("ipc.client.connection.maxidletime", 60*60*1000);
       TaskUmbilicalProtocol umbilical =
@@ -2185,7 +2192,7 @@
    */
   synchronized List<TaskStatus> getNonRunningTasks() {
     List<TaskStatus> result = new ArrayList<TaskStatus>(tasks.size());
-    for(Map.Entry<String, TaskInProgress> task: tasks.entrySet()) {
+    for(Map.Entry<TaskAttemptID, TaskInProgress> task: tasks.entrySet()) {
       if (!runningTasks.containsKey(task.getKey())) {
         result.add(task.getValue().getStatus());
       }
@@ -2262,6 +2269,7 @@
    */
   public static class MapOutputServlet extends HttpServlet {
     private static final int MAX_BYTES_TO_READ = 64 * 1024;
+    @Override
     public void doGet(HttpServletRequest request, 
                       HttpServletResponse response
                       ) throws ServletException, IOException {
@@ -2369,7 +2377,7 @@
                            StringUtils.stringifyException(ie));
         log.warn(errorMsg);
         if (isInputException) {
-          tracker.mapOutputLost(mapId, errorMsg);
+          tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
         }
         response.sendError(HttpServletResponse.SC_GONE, errorMsg);
         shuffleMetrics.failedOutput();

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Wed Apr 30 05:25:05 2008
@@ -19,7 +19,6 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.lang.InterruptedException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
 
@@ -35,16 +34,20 @@
    * Changed the version to 4, since we have replaced 
    *         TaskUmbilicalProtocol.progress(String, float, String, 
    *         org.apache.hadoop.mapred.TaskStatus.Phase, Counters) 
-   *         with {@link #statusUpdate(String, TaskStatus)}
+   *         with statusUpdate(String, TaskStatus)
+   * 
    * Version 5 changed counters representation for HADOOP-2248
    * Version 6 changes the TaskStatus representation for HADOOP-2208
    * Version 7 changes the done api (via HADOOP-3140). It now expects whether
    *           or not the task's output needs to be promoted.
+   * Version 8 changes {job|tip|task}id's to use their corresponding 
+   * objects rather than strings.
    * */
-  public static final long versionID = 7L;
+
+  public static final long versionID = 8L;
   
   /** Called when a child task process starts, to get its task.*/
-  Task getTask(String taskid) throws IOException;
+  Task getTask(TaskAttemptID taskid) throws IOException;
 
   /**
    * Report child's progress to parent.
@@ -55,7 +58,7 @@
    * @throws InterruptedException
    * @return True if the task is known
    */
-  boolean statusUpdate(String taskId, TaskStatus taskStatus) 
+  boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
   throws IOException, InterruptedException;
   
   /** Report error messages back to parent.  Calls should be sparing, since all
@@ -63,25 +66,25 @@
    *  @param taskid the id of the task involved
    *  @param trace the text to report
    */
-  void reportDiagnosticInfo(String taskid, String trace) throws IOException;
+  void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException;
 
   /** Periodically called by child to check if parent is still alive. 
    * @return True if the task is known
    */
-  boolean ping(String taskid) throws IOException;
+  boolean ping(TaskAttemptID taskid) throws IOException;
 
   /** Report that the task is successfully completed.  Failure is assumed if
    * the task process exits without calling this.
    * @param taskid task's id
    * @param shouldBePromoted whether to promote the task's output or not 
    */
-  void done(String taskid, boolean shouldBePromoted) throws IOException;
+  void done(TaskAttemptID taskid, boolean shouldBePromoted) throws IOException;
 
   /** Report that a reduce-task couldn't shuffle map-outputs.*/
-  void shuffleError(String taskId, String message) throws IOException;
+  void shuffleError(TaskAttemptID taskId, String message) throws IOException;
   
   /** Report that the task encounted a local filesystem error.*/
-  void fsError(String taskId, String message) throws IOException;
+  void fsError(TaskAttemptID taskId, String message) throws IOException;
 
   /** Called by a reduce task to get the map output locations for finished maps.
    *
@@ -91,7 +94,7 @@
    * @param maxLocs the max number of locations to fetch
    * @return an array of TaskCompletionEvent
    */
-  TaskCompletionEvent[] getMapCompletionEvents(String jobId, 
+  TaskCompletionEvent[] getMapCompletionEvents(JobID jobId, 
                                                int fromIndex, int maxLocs) throws IOException;
 
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java Wed Apr 30 05:25:05 2008
@@ -19,18 +19,18 @@
 package org.apache.hadoop.mapred.jobcontrol;
 
 
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.util.StringUtils;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.io.IOException;
-
 /** This class encapsulates a MapReduce job and its dependency. It monitors 
  *  the states of the depending jobs and updates the state of this job.
  *  A job stats in the WAITING state. If it does not have any deoending jobs, or
@@ -56,7 +56,7 @@
   private JobConf theJobConf;
   private int state;
   private String jobID; 		// assigned and used by JobControl class
-  private String mapredJobID; // the job ID assigned by map/reduce
+  private JobID mapredJobID; // the job ID assigned by map/reduce
   private String jobName;		// external name, assigned/used by client app
   private String message;		// some info for human consumption, 
   // e.g. the reason why the job failed
@@ -74,7 +74,7 @@
     this.dependingJobs = dependingJobs;
     this.state = Job.WAITING;
     this.jobID = "unassigned";
-    this.mapredJobID = "unassigned";
+    this.mapredJobID = null; //not yet assigned 
     this.jobName = "unassigned";
     this.message = "just initialized";
     this.jc = new JobClient(jobConf);
@@ -90,12 +90,14 @@
     this(jobConf, null);
   }
 	
+  @Override
   public String toString() {
     StringBuffer sb = new StringBuffer();
     sb.append("job name:\t").append(this.jobName).append("\n");
     sb.append("job id:\t").append(this.jobID).append("\n");
     sb.append("job state:\t").append(this.state).append("\n");
-    sb.append("job mapred id:\t").append(this.mapredJobID).append("\n");
+    sb.append("job mapred id:\t").append(this.mapredJobID==null ? "unassigned" 
+        : this.mapredJobID).append("\n");
     sb.append("job message:\t").append(this.message).append("\n");
 		
     if (this.dependingJobs == null || this.dependingJobs.size() == 0) {
@@ -104,7 +106,7 @@
       sb.append("job has ").append(this.dependingJobs.size()).append(" dependeng jobs:\n");
       for (int i = 0; i < this.dependingJobs.size(); i++) {
         sb.append("\t depending job ").append(i).append(":\t");
-        sb.append(((Job) this.dependingJobs.get(i)).getJobName()).append("\n");
+        sb.append((this.dependingJobs.get(i)).getJobName()).append("\n");
       }
     }
     return sb.toString();
@@ -126,7 +128,7 @@
   }
 	
   /**
-   * @return the job ID of this job
+   * @return the job ID of this job assigned by JobControl
    */
   public String getJobID() {
     return this.jobID;
@@ -142,20 +144,41 @@
 	
   /**
    * @return the mapred ID of this job
+   * @deprecated use {@link #getAssignedJobID()} instead
    */
+  @Deprecated
   public String getMapredJobID() {
-    return this.mapredJobID;
+    return this.mapredJobID.toString();
   }
 	
   /**
    * Set the mapred ID for this job.
    * @param mapredJobID the mapred job ID for this job.
+   * @deprecated use {@link #setAssignedJobID(JobID)} instead
    */
+  @Deprecated
   public void setMapredJobID(String mapredJobID) {
-    this.jobID = mapredJobID;
+    this.mapredJobID = JobID.forName(mapredJobID);
   }
 	
   /**
+   * @return the mapred ID of this job as assigned by the 
+   * mapred framework.
+   */
+  public JobID getAssignedJobID() {
+    return this.mapredJobID;
+  }
+  
+  /**
+   * Set the mapred ID for this job as assigned by the 
+   * mapred framework.
+   * @param mapredJobID the mapred job ID for this job.
+   */
+  public void setAssignedJobID(JobID mapredJobID) {
+    this.mapredJobID = mapredJobID;
+  }
+  
+  /**
    * @return the mapred job conf of this job
    */
   public JobConf getJobConf() {
@@ -304,7 +327,7 @@
     Job pred = null;
     int n = this.dependingJobs.size();
     for (int i = 0; i < n; i++) {
-      pred = (Job) this.dependingJobs.get(i);
+      pred = this.dependingJobs.get(i);
       int s = pred.checkState();
       if (s == Job.WAITING || s == Job.READY || s == Job.RUNNING) {
         break; // a pred is still not completed, continue in WAITING
@@ -345,7 +368,7 @@
         }
       }
       RunningJob running = jc.submitJob(theJobConf);
-      this.mapredJobID = running.getJobID();
+      this.mapredJobID = running.getID();
       this.state = Job.RUNNING;
     } catch (IOException ioe) {
       this.state = Job.FAILED;
@@ -353,12 +376,4 @@
     }
   }
 	
-  /**
-   * @param args
-   */
-  public static void main(String[] args) {
-    // TODO Auto-generated method stub
-
-  }
-
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java Wed Apr 30 05:25:05 2008
@@ -36,6 +36,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapred.TaskLog;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -80,7 +81,7 @@
     FileUtil.chmod(executable, "a+x");
     cmd.add(executable);
     // wrap the command in a stdout/stderr capture
-    String taskid = conf.get("mapred.task.id");
+    TaskAttemptID taskid = TaskAttemptID.forName(conf.get("mapred.task.id"));
     File stdout = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDOUT);
     File stderr = TaskLog.getTaskLogFile(taskid, TaskLog.LogName.STDERR);
     long logLength = TaskLog.getTaskLogLength(conf);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java Wed Apr 30 05:25:05 2008
@@ -17,15 +17,17 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.Path;
-
-import java.io.*;
-import java.util.Properties;
 
 public class TestJobStatusPersistency extends ClusterMapReduceTestCase {
-  private String runJob() throws Exception {
+  private JobID runJob() throws Exception {
     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
     Writer wr = new OutputStreamWriter(os);
     wr.write("hello1\n");
@@ -53,11 +55,11 @@
 
     FileOutputFormat.setOutputPath(conf, getOutputDir());
 
-    return JobClient.runJob(conf).getJobID();
+    return JobClient.runJob(conf).getID();
   }
 
   public void testNonPersistency() throws Exception {
-    String jobId = runJob();
+    JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
     RunningJob rj = jc.getJob(jobId);
     assertNotNull(rj);
@@ -74,7 +76,7 @@
     config.setProperty("mapred.job.tracker.persist.jobstatus.hours", "1");
     stopCluster();
     startCluster(false, config);
-    String jobId = runJob();
+    JobID jobId = runJob();
     JobClient jc = new JobClient(createJobConf());
     RunningJob rj0 = jc.getJob(jobId);
     assertNotNull(rj0);
@@ -96,7 +98,7 @@
     TaskCompletionEvent[] events1 = rj1.getTaskCompletionEvents(0);
     assertEquals(events0.length, events1.length);    
     for (int i = 0; i < events0.length; i++) {
-      assertEquals(events0[i].getTaskId(), events1[i].getTaskId());
+      assertEquals(events0[i].getTaskID(), events1[i].getTaskID());
       assertEquals(events0[i].getTaskStatus(), events1[i].getTaskStatus());
     }
   }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java Wed Apr 30 05:25:05 2008
@@ -17,14 +17,25 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.lib.*;
+import java.io.BufferedWriter;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.Random;
+
 import junit.framework.TestCase;
-import java.io.*;
-import java.util.*;
 
-import static org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.SequenceFile.CompressionType;
+import org.apache.hadoop.mapred.lib.IdentityMapper;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
 
 /**********************************************************
  * MapredLoadTest generates a bunch of work that exercises
@@ -255,15 +266,14 @@
   private static class MyReduce extends IdentityReducer {
     private JobConf conf;
     private boolean compressInput;
-    private String taskId;
-    private String jobId;
+    private TaskAttemptID taskId;
     private boolean first = true;
       
+    @Override
     public void configure(JobConf conf) {
       this.conf = conf;
       compressInput = conf.getCompressMapOutput();
-      taskId = conf.get("mapred.task.id");
-      jobId = conf.get("mapred.job.id");
+      taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
     }
       
     public void reduce(WritableComparable key, Iterator values,
@@ -271,7 +281,7 @@
                        ) throws IOException {
       if (first) {
         first = false;
-        MapOutputFile mapOutputFile = new MapOutputFile(jobId);
+        MapOutputFile mapOutputFile = new MapOutputFile(taskId.getJobID());
         mapOutputFile.setConf(conf);
         Path input = mapOutputFile.getInputFile(0, taskId);
         FileSystem fs = FileSystem.get(conf);

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java Wed Apr 30 05:25:05 2008
@@ -18,20 +18,25 @@
 
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.MRCaching.TestResult;
-import org.apache.hadoop.util.Progressable;
-
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
-import java.io.IOException;
 import java.io.File;
+import java.io.IOException;
 import java.util.Iterator;
+
 import junit.framework.TestCase;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.mapred.MRCaching.TestResult;
+import org.apache.hadoop.util.Progressable;
+
 /**
  * A JUnit test to test min map-reduce cluster with local file system.
  */
@@ -64,7 +69,7 @@
       assertTrue("Failed test archives not matching", ret.isOutputOk);
       // test the task report fetchers
       JobClient client = new JobClient(job);
-      String jobid = ret.job.getJobID();
+      JobID jobid = ret.job.getID();
       TaskReport[] reports = client.getMapTaskReports(jobid);
       assertEquals("number of maps", 1, reports.length);
       reports = client.getReduceTaskReports(jobid);
@@ -247,7 +252,7 @@
         // expected result
       }
       while (values.hasNext()) {
-        Writable value = (Writable) values.next();
+        Writable value = values.next();
         System.out.println("reduce: " + key + ", " + value);
         output.collect(key, value);
       }

Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java Wed Apr 30 05:25:05 2008
@@ -17,23 +17,25 @@
  */
 package org.apache.hadoop.mapred;
 
-import java.io.*;
-import java.util.*;
-import java.net.URISyntaxException;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URI;
+import java.net.URISyntaxException;
+
 import junit.framework.TestCase;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.MiniDFSCluster;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.lib.IdentityReducer;
-import org.apache.hadoop.filecache.DistributedCache; 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.dfs.MiniDFSCluster;
 
 /**
  * Class to test mapred debug Script
@@ -67,7 +69,7 @@
    * @return task log as string
    * @throws IOException
    */
-  public static String readTaskLog(TaskLog.LogName  filter, String taskId)
+  public static String readTaskLog(TaskLog.LogName  filter, TaskAttemptID taskId)
   throws IOException {
     // string buffer to store task log
     StringBuffer result = new StringBuffer();
@@ -161,9 +163,9 @@
     	e.printStackTrace();
     }
 
-    String jobId = job.getJobID();
+    JobID jobId = job.getID();
     // construct the task id of first map task of failmap
-    String taskId = "task_" + jobId.substring(4) + "_m_000000_0";
+    TaskAttemptID taskId = new TaskAttemptID(new TaskID(jobId,true, 0), 0);
     // wait for the job to finish.
     while (!job.isComplete()) ;
     



Mime
View raw message