hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject svn commit: r652364 [1/4] - in /hadoop/core/trunk: ./ src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/ src/contrib/streaming/src/java/org/apache/hadoop/streaming/ src/docs/src/documentation/content/xdocs/ src/java/org/apache/hadoop/...
Date Wed, 30 Apr 2008 12:25:06 GMT
Author: ddas
Date: Wed Apr 30 05:25:05 2008
New Revision: 652364

URL: http://svn.apache.org/viewvc?rev=652364&view=rev
Log:
HADOOP-544. This issue introduces new classes JobID, TaskInProgressID and TaskID, which should be used instead of their string counterparts. Functions in JobClient, TaskReport, RunningJob, jobcontrol.Job and TaskCompletionEvent that use string arguments are deprecated in favor of the corresponding ones that use ID objects. Applications can use xxxID.toString() and xxxID.forName() methods to convert/restore objects to/from strings. Contributed by Enis Soztutar.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
    hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
    hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/KillJobAction.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/KillTaskAction.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputFile.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapOutputLocation.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTaskRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/MapTaskStatus.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTaskStatus.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/RunningJob.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/StatusHttpServer.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/Task.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLog.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogAppender.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskLogServlet.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskReport.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskRunner.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/jobcontrol/Job.java
    hadoop/core/trunk/src/java/org/apache/hadoop/mapred/pipes/Application.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestJobStatusPersistency.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMapRed.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRLocalFS.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRMapRedDebugScript.java
    hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
    hadoop/core/trunk/src/webapps/job/jobblacklistedtrackers.jsp
    hadoop/core/trunk/src/webapps/job/jobconf.jsp
    hadoop/core/trunk/src/webapps/job/jobdetails.jsp
    hadoop/core/trunk/src/webapps/job/jobfailures.jsp
    hadoop/core/trunk/src/webapps/job/jobtasks.jsp
    hadoop/core/trunk/src/webapps/job/jobtracker.jsp
    hadoop/core/trunk/src/webapps/job/taskdetails.jsp
    hadoop/core/trunk/src/webapps/job/taskstats.jsp
    hadoop/core/trunk/src/webapps/task/tasktracker.jsp

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Wed Apr 30 05:25:05 2008
@@ -22,7 +22,14 @@
     is not supported. If upgrading from 0.13 or earlier is required,
     please upgrade to an intermediate version (0.14-0.17) and then
     to this version. (rangadi)
- 
+
+    HADOOP-544. This issue introduces new classes JobID, TaskInProgressID
+    and TaskID, which should be used instead of their string counterparts.
+    Functions in JobClient, TaskReport, RunningJob, jobcontrol.Job and 
+    TaskCompletionEvent that use string arguments are deprecated in favor 
+    of the corresponding ones that use ID objects. Applications can use 
+    xxxID.toString() and xxxID.forName() methods to convert/restore objects 
+    to/from strings. (Enis Soztutar via ddas)
 
   NEW FEATURES
 

Modified: hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java (original)
+++ hadoop/core/trunk/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinJob.java Wed Apr 30 05:25:05 2008
@@ -34,6 +34,7 @@
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapred.JobID;
 
 /**
  * This class implements the main function for creating a map/reduce
@@ -127,7 +128,7 @@
     RunningJob running = null;
     try {
       running = jc.submitJob(job);
-      String jobId = running.getJobID();
+      JobID jobId = running.getID();
       System.out.println("Job " + jobId + " is submitted");
       while (!running.isComplete()) {
         System.out.println("Job " + jobId + " is still running.");

Modified: hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java (original)
+++ hadoop/core/trunk/src/contrib/streaming/src/java/org/apache/hadoop/streaming/StreamJob.java Wed Apr 30 05:25:05 2008
@@ -32,7 +32,12 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 
-import org.apache.commons.cli2.*; 
+import org.apache.commons.cli2.Argument;
+import org.apache.commons.cli2.CommandLine;
+import org.apache.commons.cli2.Group;
+import org.apache.commons.cli2.Option;
+import org.apache.commons.cli2.OptionException;
+import org.apache.commons.cli2.WriteableCommandLine;
 import org.apache.commons.cli2.builder.ArgumentBuilder;
 import org.apache.commons.cli2.builder.DefaultOptionBuilder;
 import org.apache.commons.cli2.builder.GroupBuilder;
@@ -42,29 +47,28 @@
 import org.apache.commons.cli2.util.HelpFormatter;
 import org.apache.commons.cli2.validation.InvalidArgumentException;
 import org.apache.commons.cli2.validation.Validator;
-import org.apache.commons.logging.*;
-
-import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
-import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-
-import org.apache.hadoop.io.Text;
+import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.Path;
-
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.InvalidJobConfException;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.KeyValueTextInputFormat;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.TextInputFormat;
 import org.apache.hadoop.mapred.TextOutputFormat;
-import org.apache.hadoop.filecache.*;
-import org.apache.hadoop.util.*;
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
+import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
+import org.apache.hadoop.util.StringUtils;
 
 /** All the client-side work happens here.
  * (Jar packaging, MapRed job submission and monitoring)
@@ -638,7 +642,7 @@
     Iterator it = userJobConfProps_.keySet().iterator();
     while (it.hasNext()) {
       String key = (String) it.next();
-      String val = (String)userJobConfProps_.get(key);
+      String val = userJobConfProps_.get(key);
       boolean earlyName = key.equals("fs.default.name");
       earlyName |= key.equals("stream.shipped.hadoopstreaming");
       if (doEarlyProps == earlyName) {
@@ -919,7 +923,7 @@
     String lastReport = null;
     try {
       running_ = jc_.submitJob(jobConf_);
-      jobId_ = running_.getJobID();
+      jobId_ = running_.getID();
 
       LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));
       LOG.info("Running job: " + jobId_);
@@ -984,12 +988,14 @@
       this.optionString = optionString;
     }
 
+    @Override
     public boolean canProcess(final WriteableCommandLine commandLine,
                               final String argument) {
       boolean ret = (argument != null) && argument.startsWith(optionString);
         
       return ret;
     }    
+    @Override
     public void process(final WriteableCommandLine commandLine,
                         final ListIterator arguments) throws OptionException {
       final String arg = (String) arguments.next();
@@ -1070,7 +1076,7 @@
   protected long minRecWrittenToEnableSkip_;
 
   protected RunningJob running_;
-  protected String jobId_;
+  protected JobID jobId_;
   protected static String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
     "Please specify a different link name for all of your caching URIs";
 }

Modified: hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml (original)
+++ hadoop/core/trunk/src/docs/src/documentation/content/xdocs/mapred_tutorial.xml Wed Apr 30 05:25:05 2008
@@ -1320,7 +1320,7 @@
           example, speculative tasks) trying to open and/or write to the same 
           file (path) on the <code>FileSystem</code>. Hence the 
           application-writer will have to pick unique names per task-attempt 
-          (using the taskid, say <code>task_200709221812_0001_m_000000_0</code>), 
+          (using the attemptid, say <code>attempt_200709221812_0001_m_000000_0</code>), 
           not just per task.</p> 
  
           <p>To avoid these issues the Map-Reduce framework maintains a special 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/CompletedJobStatusStore.java Wed Apr 30 05:25:05 2008
@@ -17,12 +17,16 @@
  */  
 package org.apache.hadoop.mapred;
 
+import java.io.IOException;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.*;
-
-import java.io.IOException;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 
 /**
  * Persists and retrieves the Job info of a job into/from DFS.
@@ -118,7 +122,7 @@
     }
   }
 
-  private Path getInfoFilePath(String jobId) {
+  private Path getInfoFilePath(JobID jobId) {
     return new Path(jobInfoDir, jobId + ".info");
   }
   
@@ -129,7 +133,7 @@
    */
   public void store(JobInProgress job) {
     if (active && retainTime > 0) {
-      String jobId = job.getStatus().getJobId();
+      JobID jobId = job.getStatus().getJobID();
       Path jobStatusFile = getInfoFilePath(jobId);
       try {
         FSDataOutputStream dataOut = fs.create(jobStatusFile);
@@ -161,7 +165,7 @@
     }
   }
 
-  private FSDataInputStream getJobInfoFile(String jobId) throws IOException {
+  private FSDataInputStream getJobInfoFile(JobID jobId) throws IOException {
     Path jobStatusFile = getInfoFilePath(jobId);
     return (fs.exists(jobStatusFile)) ? fs.open(jobStatusFile) : null;
   }
@@ -213,7 +217,7 @@
    * @param jobId the jobId for which jobStatus is queried
    * @return JobStatus object, null if not able to retrieve
    */
-  public JobStatus readJobStatus(String jobId) {
+  public JobStatus readJobStatus(JobID jobId) {
     JobStatus jobStatus = null;
     if (active) {
       try {
@@ -236,7 +240,7 @@
    * @param jobId the jobId for which jobProfile is queried
    * @return JobProfile object, null if not able to retrieve
    */
-  public JobProfile readJobProfile(String jobId) {
+  public JobProfile readJobProfile(JobID jobId) {
     JobProfile jobProfile = null;
     if (active) {
       try {
@@ -260,7 +264,7 @@
    * @param jobId the jobId for which Counters is queried
    * @return Counters object, null if not able to retrieve
    */
-  public Counters readCounters(String jobId) {
+  public Counters readCounters(JobID jobId) {
     Counters counters = null;
     if (active) {
       try {
@@ -287,7 +291,7 @@
    * @param maxEvents   max number of events
    * @return TaskCompletionEvent[], empty array if not able to retrieve
    */
-  public TaskCompletionEvent[] readJobTaskCompletionEvents(String jobId,
+  public TaskCompletionEvent[] readJobTaskCompletionEvents(JobID jobId,
                                                                int fromEventId,
                                                                int maxEvents) {
     TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/FileOutputFormat.java Wed Apr 30 05:25:05 2008
@@ -157,8 +157,8 @@
    * <p>In such cases there could be issues with 2 instances of the same TIP 
    * (running simultaneously e.g. speculative tasks) trying to open/write-to the
    * same file (path) on HDFS. Hence the application-writer will have to pick 
-   * unique names per task-attempt (e.g. using the taskid, say 
-   * <tt>task_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> 
+   * unique names per task-attempt (e.g. using the attemptid, say 
+   * <tt>attempt_200709221812_0001_m_000000_0</tt>), not just per TIP.</p> 
    * 
    * <p>To get around this the Map-Reduce framework helps the application-writer 
    * out by maintaining a special 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/InterTrackerProtocol.java Wed Apr 30 05:25:05 2008
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.*;
+import java.io.IOException;
 
 import org.apache.hadoop.ipc.VersionedProtocol;
 
@@ -40,8 +40,9 @@
    * Version 8: HeartbeatResponse is added with the next heartbeat interval.
    * version 9 changes the counter representation for HADOOP-2248
    * version 10 changes the TaskStatus representation for HADOOP-2208
+   * version 11 changes string to JobID in getTaskCompletionEvents().
    */
-  public static final long versionID = 10L;
+  public static final long versionID = 11L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;
@@ -97,8 +98,8 @@
    * @return array of task completion events. 
    * @throws IOException
    */
-  TaskCompletionEvent[] getTaskCompletionEvents(
-                                                String jobid, int fromEventId, int maxEvents) throws IOException;
+  TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid, int fromEventId
+      , int maxEvents) throws IOException;
   
 }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Wed Apr 30 05:25:05 2008
@@ -44,27 +44,27 @@
       return TaskUmbilicalProtocol.versionID;
     }
     
-    public void done(String taskid, boolean shouldPromote) throws IOException {
+    public void done(TaskAttemptID taskid, boolean shouldPromote) throws IOException {
       LOG.info("Task " + taskid + " reporting done.");
     }
 
-    public void fsError(String taskId, String message) throws IOException {
+    public void fsError(TaskAttemptID taskId, String message) throws IOException {
       LOG.info("Task " + taskId + " reporting file system error: " + message);
     }
 
-    public void shuffleError(String taskId, String message) throws IOException {
+    public void shuffleError(TaskAttemptID taskId, String message) throws IOException {
       LOG.info("Task " + taskId + " reporting shuffle error: " + message);
     }
 
-    public Task getTask(String taskid) throws IOException {
+    public Task getTask(TaskAttemptID taskid) throws IOException {
       return null;
     }
 
-    public boolean ping(String taskid) throws IOException {
+    public boolean ping(TaskAttemptID taskid) throws IOException {
       return true;
     }
 
-    public boolean statusUpdate(String taskId, TaskStatus taskStatus) 
+    public boolean statusUpdate(TaskAttemptID taskId, TaskStatus taskStatus) 
     throws IOException, InterruptedException {
       StringBuffer buf = new StringBuffer("Task ");
       buf.append(taskId);
@@ -81,11 +81,11 @@
       return true;
     }
 
-    public void reportDiagnosticInfo(String taskid, String trace) throws IOException {
+    public void reportDiagnosticInfo(TaskAttemptID taskid, String trace) throws IOException {
       LOG.info("Task " + taskid + " has problem " + trace);
     }
     
-    public TaskCompletionEvent[] getMapCompletionEvents(String jobId, 
+    public TaskCompletionEvent[] getMapCompletionEvents(JobID jobId, 
                                                         int fromEventId, int maxLocs) throws IOException {
       return TaskCompletionEvent.EMPTY_ARRAY;
     }
@@ -116,14 +116,13 @@
    * @param conf the jobconf
    * @throws IOException if something goes wrong writing
    */
-  private static void fillInMissingMapOutputs(FileSystem fs,
-                                              String jobId,
-                                              String taskId,
+  private static void fillInMissingMapOutputs(FileSystem fs, 
+                                              TaskAttemptID taskId,
                                               int numMaps,
                                               JobConf conf) throws IOException {
     Class keyClass = conf.getMapOutputKeyClass();
     Class valueClass = conf.getMapOutputValueClass();
-    MapOutputFile namer = new MapOutputFile(jobId);
+    MapOutputFile namer = new MapOutputFile(taskId.getJobID());
     namer.setConf(conf);
     for(int i=0; i<numMaps; i++) {
       Path f = namer.getInputFile(i, taskId);
@@ -151,9 +150,8 @@
       System.exit(1);
     }
     JobConf conf = new JobConf(new Path(jobFilename.toString()));
-    String taskId = conf.get("mapred.task.id");
+    TaskAttemptID taskId = TaskAttemptID.forName(conf.get("mapred.task.id"));
     boolean isMap = conf.getBoolean("mapred.task.is.map", true);
-    String jobId = conf.get("mapred.job.id");
     int partition = conf.getInt("mapred.task.partition", 0);
     
     // setup the local and user working directories
@@ -161,7 +159,7 @@
     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
     File workDirName = new File(lDirAlloc.getLocalPathToRead(
                                   TaskTracker.getJobCacheSubdir() 
-                                  + Path.SEPARATOR + jobId 
+                                  + Path.SEPARATOR + taskId.getJobID() 
                                   + Path.SEPARATOR + taskId
                                   + Path.SEPARATOR + "work",
                                   conf). toString());
@@ -182,13 +180,11 @@
       BytesWritable split = new BytesWritable();
       split.readFields(splitFile);
       splitFile.close();
-      task = new MapTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), 
-                         taskId, partition, splitClass, split);
+      task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split);
     } else {
       int numMaps = conf.getNumMapTasks();
-      fillInMissingMapOutputs(local, jobId, taskId, numMaps, conf);
-      task = new ReduceTask(jobId, jobFilename.toString(), conf.get("mapred.tip.id"), taskId, 
-                            partition, numMaps);
+      fillInMissingMapOutputs(local, taskId, numMaps, conf);
+      task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps);
     }
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobClient.java Wed Apr 30 05:25:05 2008
@@ -64,8 +64,6 @@
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.mapred.TaskInProgress;
-import org.apache.hadoop.mapred.DefaultJobHistoryParser.*;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
@@ -181,7 +179,7 @@
      */
     public NetworkedJob(JobStatus job) throws IOException {
       this.status = job;
-      this.profile = jobSubmitClient.getJobProfile(job.getJobId());
+      this.profile = jobSubmitClient.getJobProfile(job.getJobID());
       this.statustime = System.currentTimeMillis();
     }
 
@@ -191,7 +189,7 @@
      */
     synchronized void ensureFreshStatus() throws IOException {
       if (System.currentTimeMillis() - statustime > MAX_JOBPROFILE_AGE) {
-        this.status = jobSubmitClient.getJobStatus(profile.getJobId());
+        this.status = jobSubmitClient.getJobStatus(profile.getJobID());
         this.statustime = System.currentTimeMillis();
       }
     }
@@ -199,8 +197,15 @@
     /**
      * An identifier for the job
      */
+    public JobID getID() {
+      return profile.getJobID();
+    }
+    
+    /** @deprecated This method is deprecated and will be removed. Applications should 
+     * rather use {@link #getID()}.*/
+    @Deprecated
     public String getJobID() {
-      return profile.getJobId();
+      return profile.getJobID().toString();
     }
     
     /**
@@ -275,7 +280,7 @@
      * Tells the service to terminate the current job.
      */
     public synchronized void killJob() throws IOException {
-      jobSubmitClient.killJob(getJobID());
+      jobSubmitClient.killJob(getID());
     }
     
     /**
@@ -284,17 +289,23 @@
      * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
      * it is just killed, w/o affecting job failure status.
      */
-    public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
+    public synchronized void killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException {
       jobSubmitClient.killTask(taskId, shouldFail);
     }
 
+    /** @deprecated Applications should rather use {@link #killTask(TaskAttemptID, boolean)}*/
+    @Deprecated
+    public synchronized void killTask(String taskId, boolean shouldFail) throws IOException {
+      killTask(TaskAttemptID.forName(taskId), shouldFail);
+    }
+    
     /**
      * Fetch task completion events from jobtracker for this job. 
      */
     public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
                                                                       int startFrom) throws IOException{
       return jobSubmitClient.getTaskCompletionEvents(
-                                                     getJobID(), startFrom, 10); 
+                                                     getID(), startFrom, 10); 
     }
 
     /**
@@ -306,7 +317,7 @@
         ensureFreshStatus();
       } catch (IOException e) {
       }
-      return "Job: " + profile.getJobId() + "\n" + 
+      return "Job: " + profile.getJobID() + "\n" + 
         "file: " + profile.getJobFile() + "\n" + 
         "tracking URL: " + profile.getURL() + "\n" + 
         "map() completion: " + status.mapProgress() + "\n" + 
@@ -317,7 +328,7 @@
      * Returns the counters for this job
      */
     public Counters getCounters() throws IOException {
-      return jobSubmitClient.getJobCounters(getJobID());
+      return jobSubmitClient.getJobCounters(getID());
     }
   }
 
@@ -693,8 +704,8 @@
      * configure the command line options correctly on the submitting dfs
      */
     
-    String jobId = jobSubmitClient.getNewJobId();
-    Path submitJobDir = new Path(job.getSystemDir(), jobId);
+    JobID jobId = jobSubmitClient.getNewJobId();
+    Path submitJobDir = new Path(job.getSystemDir(), jobId.toString());
     Path submitJarFile = new Path(submitJobDir, "job.jar");
     Path submitSplitFile = new Path(submitJobDir, "job.split");
     configureCommandLineOptions(job, submitJobDir, submitJarFile);
@@ -874,7 +885,7 @@
    *         <code>jobid</code> doesn't correspond to any known job.
    * @throws IOException
    */
-  public RunningJob getJob(String jobid) throws IOException {
+  public RunningJob getJob(JobID jobid) throws IOException {
     JobStatus status = jobSubmitClient.getJobStatus(jobid);
     if (status != null) {
       return new NetworkedJob(status);
@@ -883,6 +894,13 @@
     }
   }
 
+  /**@deprecated Applications should rather use {@link #getJob(JobID)}. 
+   */
+  @Deprecated
+  public RunningJob getJob(String jobid) throws IOException {
+    return getJob(JobID.forName(jobid));
+  }
+  
   /**
    * Get the information of the current state of the map tasks of a job.
    * 
@@ -890,10 +908,16 @@
    * @return the list of all of the map tips.
    * @throws IOException
    */
-  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
+  public TaskReport[] getMapTaskReports(JobID jobId) throws IOException {
     return jobSubmitClient.getMapTaskReports(jobId);
   }
-    
+  
+  /**@deprecated Applications should rather use {@link #getMapTaskReports(JobID)}*/
+  @Deprecated
+  public TaskReport[] getMapTaskReports(String jobId) throws IOException {
+    return getMapTaskReports(JobID.forName(jobId));
+  }
+  
   /**
    * Get the information of the current state of the reduce tasks of a job.
    * 
@@ -901,10 +925,16 @@
    * @return the list of all of the reduce tips.
    * @throws IOException
    */    
-  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
+  public TaskReport[] getReduceTaskReports(JobID jobId) throws IOException {
     return jobSubmitClient.getReduceTaskReports(jobId);
   }
    
+  /**@deprecated Applications should rather use {@link #getReduceTaskReports(JobID)}*/
+  @Deprecated
+  public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
+    return getReduceTaskReports(JobID.forName(jobId));
+  }
+  
   /**
    * Get status information about the Map-Reduce cluster.
    *  
@@ -933,7 +963,7 @@
                                        "&plaintext=true&filter=profile"
                                        ).openConnection();
     InputStream in = connection.getInputStream();
-    OutputStream out = new FileOutputStream(e.getTaskId() + ".profile");
+    OutputStream out = new FileOutputStream(e.getTaskID() + ".profile");
     IOUtils.copyBytes(in, out, 64 * 1024, true);
   }
 
@@ -971,7 +1001,7 @@
     }
     try {
       running = jc.submitJob(job);
-      String jobId = running.getJobID();
+      JobID jobId = running.getID();
       LOG.info("Running job: " + jobId);
       int eventCounter = 0;
       boolean profiling = job.getProfileEnabled();
@@ -1015,7 +1045,7 @@
               if (event.getTaskStatus() == 
                 TaskCompletionEvent.Status.SUCCEEDED){
                 LOG.info(event.toString());
-                displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
+                displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
               }
               break; 
             case FAILED:
@@ -1023,18 +1053,16 @@
                 TaskCompletionEvent.Status.FAILED){
                 LOG.info(event.toString());
                 // Displaying the task diagnostic information
-                String taskId = event.getTaskId();
-                String tipId = TaskInProgress.getTipId(taskId);
+                TaskAttemptID taskId = event.getTaskID();
                 String[] taskDiagnostics = 
-                  jc.jobSubmitClient.getTaskDiagnostics(jobId, tipId, 
-                                                        taskId); 
+                  jc.jobSubmitClient.getTaskDiagnostics(taskId); 
                 if (taskDiagnostics != null) {
                   for(String diagnostics : taskDiagnostics){
                     System.err.println(diagnostics);
                   }
                 }
                 // Displaying the task logs
-                displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
+                displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
               }
               break; 
             case KILLED:
@@ -1044,7 +1072,7 @@
               break; 
             case ALL:
               LOG.info(event.toString());
-              displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
+              displayTaskLogs(event.getTaskID(), event.getTaskTrackerHttp());
               break;
             }
           }
@@ -1073,7 +1101,7 @@
     return running;
   }
 
-  private static void displayTaskLogs(String taskId, String baseUrl)
+  private static void displayTaskLogs(TaskAttemptID taskId, String baseUrl)
     throws IOException {
     // The tasktracker for a 'failed/killed' job might not be around...
     if (baseUrl != null) {
@@ -1085,7 +1113,7 @@
     }
   }
     
-  private static void getTaskLogs(String taskId, URL taskLogUrl, 
+  private static void getTaskLogs(TaskAttemptID taskId, URL taskLogUrl, 
                                   OutputStream out) {
     try {
       URLConnection connection = taskLogUrl.openConnection();
@@ -1274,10 +1302,10 @@
     try {
       if (submitJobFile != null) {
         RunningJob job = submitJob(conf);
-        System.out.println("Created job " + job.getJobID());
+        System.out.println("Created job " + job.getID());
         exitCode = 0;
       } else if (getStatus) {
-        RunningJob job = getJob(jobid);
+        RunningJob job = getJob(JobID.forName(jobid));
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
@@ -1287,7 +1315,7 @@
           exitCode = 0;
         }
       } else if (killJob) {
-        RunningJob job = getJob(jobid);
+        RunningJob job = getJob(JobID.forName(jobid));
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
@@ -1299,7 +1327,7 @@
         viewHistory(outputDir, viewAllHistory);
         exitCode = 0;
       } else if (listEvents) {
-        listEvents(jobid, fromEvent, nEvents);
+        listEvents(JobID.forName(jobid), fromEvent, nEvents);
         exitCode = 0;
       } else if (listJobs) {
         listJobs();
@@ -1308,7 +1336,7 @@
           listAllJobs();
           exitCode = 0;
       } else if(killTask) {
-        if(jobSubmitClient.killTask(taskid, false)) {
+        if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), false)) {
           System.out.println("Killed task " + taskid);
           exitCode = 0;
         } else {
@@ -1316,7 +1344,7 @@
           exitCode = -1;
         }
       } else if(failTask) {
-        if(jobSubmitClient.killTask(taskid, true)) {
+        if(jobSubmitClient.killTask(TaskAttemptID.forName(taskid), true)) {
           System.out.println("Killed task " + taskid + " by failing it");
           exitCode = 0;
         } else {
@@ -1342,7 +1370,7 @@
    * @param jobId the job id for the job's events to list
    * @throws IOException
    */
-  private void listEvents(String jobId, int fromEventId, int numEvents)
+  private void listEvents(JobID jobId, int fromEventId, int numEvents)
     throws IOException {
     TaskCompletionEvent[] events = 
       jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
@@ -1350,7 +1378,7 @@
     System.out.println("Number of events (from " + fromEventId + 
                        ") are: " + events.length);
     for(TaskCompletionEvent event: events) {
-      System.out.println(event.getTaskStatus() + " " + event.getTaskId() + 
+      System.out.println(event.getTaskStatus() + " " + event.getTaskID() + 
                          " " + event.getTaskTrackerHttp());
     }
   }
@@ -1367,7 +1395,7 @@
     System.out.printf("%d jobs currently running\n", jobs.length);
     System.out.printf("JobId\tState\tStartTime\tUserName\n");
     for (JobStatus job : jobs) {
-      System.out.printf("%s\t%d\t%d\t%s\n", job.getJobId(), job.getRunState(),
+      System.out.printf("%s\t%d\t%d\t%s\n", job.getJobID(), job.getRunState(),
           job.getStartTime(), job.getUsername());
     }
   }
@@ -1386,7 +1414,7 @@
                        "\tFailed : 3\tPrep : 4\n");
     System.out.printf("JobId\tState\tStartTime\tUserName\n");
     for (JobStatus job : jobs) {
-      System.out.printf("%s\t%d\t%d\t%s\n", job.getJobId(), job.getRunState(),
+      System.out.printf("%s\t%d\t%d\t%s\n", job.getJobID(), job.getRunState(),
           job.getStartTime(), job.getUsername());
     }
   }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobEndNotifier.java Wed Apr 30 05:25:05 2008
@@ -101,7 +101,7 @@
       int retryAttempts = conf.getInt("job.end.retry.attempts", 0) + 1;
       long retryInterval = conf.getInt("job.end.retry.interval", 30000);
       if (uri.contains("$jobId")) {
-        uri = uri.replace("$jobId", status.getJobId());
+        uri = uri.replace("$jobId", status.getJobID().toString());
       }
       if (uri.contains("$jobStatus")) {
         String statusStr =
@@ -214,6 +214,7 @@
       return (int)(delayTime - ((JobEndStatusInfo)d).delayTime);
     }
 
+    @Override
     public boolean equals(Object o) {
       if (!(o instanceof JobEndStatusInfo)) {
         return false;
@@ -224,10 +225,12 @@
       return false;
     }
 
+    @Override
     public int hashCode() {
       return 37 * 17 + (int) (delayTime^(delayTime>>>32));
     }
       
+    @Override
     public String toString() {
       return "URL: " + uri + " remaining retries: " + retryAttempts +
         " interval: " + retryInterval;

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobHistory.java Wed Apr 30 05:25:05 2008
@@ -22,7 +22,6 @@
 import java.io.File;
 import java.io.FileFilter;
 import java.io.FileOutputStream;
-import java.io.FileReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.PrintWriter;
@@ -31,7 +30,6 @@
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.regex.Matcher;
@@ -346,7 +344,7 @@
      * @param jobId id of the job
      * @return the path of the job file on the local file system 
      */
-    public static String getLocalJobFilePath(String jobId){
+    public static String getLocalJobFilePath(JobID jobId){
       return System.getProperty("hadoop.log.dir") + File.separator +
                jobId + "_conf.xml";
     }
@@ -430,7 +428,7 @@
      * @param submitTime time when job tracker received the job
      * @throws IOException
      */
-    public static void logSubmitted(String jobId, JobConf jobConf, 
+    public static void logSubmitted(JobID jobId, JobConf jobConf, 
                                     String jobConfPath, long submitTime) 
     throws IOException {
       FileSystem fs = null;
@@ -502,7 +500,7 @@
           //add to writer as well 
           JobHistory.log(writers, RecordTypes.Job, 
                          new Keys[]{Keys.JOBID, Keys.JOBNAME, Keys.USER, Keys.SUBMIT_TIME, Keys.JOBCONF }, 
-                         new String[]{jobId, jobName, user, 
+                         new String[]{jobId.toString(), jobName, user, 
                                       String.valueOf(submitTime) , jobConfPath}
                         ); 
              
@@ -585,7 +583,7 @@
      * @param totalMaps total maps assigned by jobtracker. 
      * @param totalReduces total reduces. 
      */
-    public static void logStarted(String jobId, long startTime, int totalMaps, int totalReduces){
+    public static void logStarted(JobID jobId, long startTime, int totalMaps, int totalReduces){
       if (!disableHistory){
         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobId; 
         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
@@ -593,7 +591,7 @@
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job, 
                          new Keys[] {Keys.JOBID, Keys.LAUNCH_TIME, Keys.TOTAL_MAPS, Keys.TOTAL_REDUCES },
-                         new String[] {jobId,  String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)}); 
+                         new String[] {jobId.toString(),  String.valueOf(startTime), String.valueOf(totalMaps), String.valueOf(totalReduces)}); 
         }
       }
     }
@@ -607,7 +605,7 @@
      * @param failedReduces no of failed reduce tasks. 
      * @param counters the counters from the job
      */ 
-    public static void logFinished(String jobId, long finishTime, 
+    public static void logFinished(JobID jobId, long finishTime, 
                                    int finishedMaps, int finishedReduces,
                                    int failedMaps, int failedReduces,
                                    Counters counters){
@@ -623,7 +621,7 @@
                                      Keys.FINISHED_REDUCES,
                                      Keys.FAILED_MAPS, Keys.FAILED_REDUCES,
                                      Keys.COUNTERS},
-                         new String[] {jobId,  Long.toString(finishTime), 
+                         new String[] {jobId.toString(),  Long.toString(finishTime), 
                                        Values.SUCCESS.name(), 
                                        String.valueOf(finishedMaps), 
                                        String.valueOf(finishedReduces),
@@ -646,7 +644,7 @@
      * @param finishedMaps no finished map tasks. 
      * @param finishedReduces no of finished reduce tasks. 
      */
-    public static void logFailed(String jobid, long timestamp, int finishedMaps, int finishedReduces){
+    public static void logFailed(JobID jobid, long timestamp, int finishedMaps, int finishedReduces){
       if (!disableHistory){
         String logFileKey =  JOBTRACKER_UNIQUE_STRING + jobid; 
         ArrayList<PrintWriter> writer = openJobs.get(logFileKey); 
@@ -654,7 +652,7 @@
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Job,
                          new Keys[] {Keys.JOBID, Keys.FINISH_TIME, Keys.JOB_STATUS, Keys.FINISHED_MAPS, Keys.FINISHED_REDUCES },
-                         new String[] {jobid,  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), 
+                         new String[] {jobid.toString(),  String.valueOf(timestamp), Values.FAILED.name(), String.valueOf(finishedMaps), 
                                        String.valueOf(finishedReduces)}); 
           for (PrintWriter out : writer) {
             out.close();
@@ -674,43 +672,41 @@
 
     /**
      * Log start time of task (TIP).
-     * @param jobId job id
      * @param taskId task id
      * @param taskType MAP or REDUCE
      * @param startTime startTime of tip. 
      */
-    public static void logStarted(String jobId, String taskId, String taskType, 
+    public static void logStarted(TaskID taskId, String taskType, 
                                   long startTime){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE , Keys.START_TIME}, 
-                         new String[]{taskId, taskType, String.valueOf(startTime)});
+                         new String[]{taskId.toString(), taskType, String.valueOf(startTime)});
         }
       }
     }
     /**
      * Log finish time of task. 
-     * @param jobId job id
      * @param taskId task id
      * @param taskType MAP or REDUCE
      * @param finishTime finish timeof task in ms
      */
-    public static void logFinished(String jobId, String taskId, String taskType, 
+    public static void logFinished(TaskID taskId, String taskType, 
                                    long finishTime, Counters counters){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
                                     Keys.TASK_STATUS, Keys.FINISH_TIME,
                                     Keys.COUNTERS}, 
-                         new String[]{ taskId, taskType, Values.SUCCESS.name(), 
+                         new String[]{ taskId.toString(), taskType, Values.SUCCESS.name(), 
                                        String.valueOf(finishTime),
                                        counters.makeCompactString()});
         }
@@ -718,22 +714,21 @@
     }
     /**
      * Log job failed event.
-     * @param jobId jobid
      * @param taskId task id
      * @param taskType MAP or REDUCE.
      * @param time timestamp when job failed detected. 
      * @param error error message for failure. 
      */
-    public static void logFailed(String jobId, String taskId, String taskType, long time, String error){
+    public static void logFailed(TaskID taskId, String taskType, long time, String error){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.Task, 
                          new Keys[]{Keys.TASKID, Keys.TASK_TYPE, 
                                     Keys.TASK_STATUS, Keys.FINISH_TIME, Keys.ERROR}, 
-                         new String[]{ taskId,  taskType, Values.FAILED.name(), String.valueOf(time) , error});
+                         new String[]{ taskId.toString(),  taskType, Values.FAILED.name(), String.valueOf(time) , error});
         }
       }
     }
@@ -757,48 +752,44 @@
   public static class MapAttempt extends TaskAttempt{
     /**
      * Log start time of this map task attempt. 
-     * @param jobId job id
-     * @param taskId task id
      * @param taskAttemptId task attempt id
      * @param startTime start time of task attempt as reported by task tracker. 
      * @param hostName host name of the task attempt. 
      */
-    public static void logStarted(String jobId, String taskId, String taskAttemptId, long startTime, String hostName){
+    public static void logStarted(TaskAttemptID taskAttemptId, long startTime, String hostName){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.START_TIME, 
                                      Keys.HOSTNAME},
-                         new String[]{Values.MAP.name(),  taskId, 
-                                      taskAttemptId, String.valueOf(startTime), hostName}); 
+                         new String[]{Values.MAP.name(),  taskAttemptId.getTaskID().toString(), 
+                                      taskAttemptId.toString(), String.valueOf(startTime), hostName}); 
         }
       }
     }
     /**
      * Log finish time of map task attempt. 
-     * @param jobId job id
-     * @param taskId task id
      * @param taskAttemptId task attempt id 
      * @param finishTime finish time
      * @param hostName host name 
      */
-    public static void logFinished(String jobId, String taskId, 
-                                   String taskAttemptId, long finishTime, 
+    public static void logFinished(TaskAttemptID taskAttemptId, long finishTime, 
                                    String hostName){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{ Keys.TASK_TYPE, Keys.TASKID, 
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.FINISH_TIME, Keys.HOSTNAME},
-                         new String[]{Values.MAP.name(), taskId, taskAttemptId, Values.SUCCESS.name(),  
+                         new String[]{Values.MAP.name(), taskAttemptId.getTaskID().toString(),
+                                      taskAttemptId.toString(), Values.SUCCESS.name(),  
                                       String.valueOf(finishTime), hostName}); 
         }
       }
@@ -806,48 +797,46 @@
 
     /**
      * Log task attempt failed event.  
-     * @param jobId jobid
-     * @param taskId taskid
      * @param taskAttemptId task attempt id
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
      */
-    public static void logFailed(String jobId, String taskId, String taskAttemptId, 
+    public static void logFailed(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, String error){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
-                         new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.FAILED.name(),
+                         new String[]{ Values.MAP.name(), taskAttemptId.getTaskID().toString(),
+                                       taskAttemptId.toString(), Values.FAILED.name(),
                                        String.valueOf(timestamp), hostName, error}); 
         }
       }
     }
     /**
      * Log task attempt killed event.  
-     * @param jobId jobid
-     * @param taskId taskid
      * @param taskAttemptId task attempt id
      * @param timestamp timestamp
      * @param hostName hostname of this task attempt.
      * @param error error message if any for this task attempt. 
      */
-    public static void logKilled(String jobId, String taskId, String taskAttemptId, 
+    public static void logKilled(TaskAttemptID taskAttemptId, 
                                  long timestamp, String hostName, String error){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.MapAttempt, 
                          new Keys[]{Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                     Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR},
-                         new String[]{ Values.MAP.name(), taskId, taskAttemptId, Values.KILLED.name(),
+                         new String[]{ Values.MAP.name(), taskAttemptId.getTaskID().toString(), 
+                                       taskAttemptId.toString(), Values.KILLED.name(),
                                        String.valueOf(timestamp), hostName, error}); 
         }
       }
@@ -860,44 +849,39 @@
   public static class ReduceAttempt extends TaskAttempt{
     /**
      * Log start time of  Reduce task attempt. 
-     * @param jobId job id
-     * @param taskId task id (tip)
      * @param taskAttemptId task attempt id
      * @param startTime start time
      * @param hostName host name 
      */
-    public static void logStarted(String jobId, String taskId, String taskAttemptId, 
+    public static void logStarted(TaskAttemptID taskAttemptId, 
                                   long startTime, String hostName){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, 
                                       Keys.TASK_ATTEMPT_ID, Keys.START_TIME, Keys.HOSTNAME},
-                         new String[]{Values.REDUCE.name(),  taskId, 
-                                      taskAttemptId, String.valueOf(startTime), hostName}); 
+                         new String[]{Values.REDUCE.name(),  taskAttemptId.getTaskID().toString(), 
+                                      taskAttemptId.toString(), String.valueOf(startTime), hostName}); 
         }
       }
     }
     /**
      * Log finished event of this task. 
-     * @param jobId job id
-     * @param taskId task id
      * @param taskAttemptId task attempt id
      * @param shuffleFinished shuffle finish time
      * @param sortFinished sort finish time
      * @param finishTime finish time of task
      * @param hostName host name where task attempt executed
      */
-    public static void logFinished(String jobId, String taskId, 
-                                   String taskAttemptId, long shuffleFinished, 
+    public static void logFinished(TaskAttemptID taskAttemptId, long shuffleFinished, 
                                    long sortFinished, long finishTime, 
                                    String hostName){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -905,7 +889,8 @@
                                      Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                      Keys.SHUFFLE_FINISHED, Keys.SORT_FINISHED,
                                      Keys.FINISH_TIME, Keys.HOSTNAME},
-                         new String[]{Values.REDUCE.name(),  taskId, taskAttemptId, Values.SUCCESS.name(), 
+                         new String[]{Values.REDUCE.name(),  taskAttemptId.getTaskID().toString(), 
+                                      taskAttemptId.toString(), Values.SUCCESS.name(), 
                                       String.valueOf(shuffleFinished), String.valueOf(sortFinished),
                                       String.valueOf(finishTime), hostName}); 
         }
@@ -913,42 +898,39 @@
     }
     /**
      * Log failed reduce task attempt. 
-     * @param jobId job id 
-     * @param taskId task id
      * @param taskAttemptId task attempt id
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
      */
-    public static void logFailed(String jobId, String taskId, String taskAttemptId, long timestamp, 
+    public static void logFailed(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
                          new Keys[]{  Keys.TASK_TYPE, Keys.TASKID, Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME, Keys.ERROR },
-                         new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.FAILED.name(), 
+                         new String[]{ Values.REDUCE.name(), taskAttemptId.getTaskID().toString(), 
+                                       taskAttemptId.toString(), Values.FAILED.name(), 
                                        String.valueOf(timestamp), hostName, error }); 
         }
       }
     }
     /**
      * Log killed reduce task attempt. 
-     * @param jobId job id 
-     * @param taskId task id
      * @param taskAttemptId task attempt id
      * @param timestamp time stamp when task failed
      * @param hostName host name of the task attempt.  
      * @param error error message of the task. 
      */
-    public static void logKilled(String jobId, String taskId, String taskAttemptId, long timestamp, 
+    public static void logKilled(TaskAttemptID taskAttemptId, long timestamp, 
                                  String hostName, String error){
       if (!disableHistory){
         ArrayList<PrintWriter> writer = openJobs.get(JOBTRACKER_UNIQUE_STRING 
-                                                     + jobId); 
+                                                     + taskAttemptId.getJobID()); 
 
         if (null != writer){
           JobHistory.log(writer, RecordTypes.ReduceAttempt, 
@@ -956,7 +938,8 @@
                                       Keys.TASK_ATTEMPT_ID, Keys.TASK_STATUS, 
                                       Keys.FINISH_TIME, Keys.HOSTNAME, 
                                       Keys.ERROR },
-                         new String[]{ Values.REDUCE.name(), taskId, taskAttemptId, Values.KILLED.name(), 
+                         new String[]{ Values.REDUCE.name(), taskAttemptId.getTaskID().toString(), 
+                                       taskAttemptId.toString(), Values.KILLED.name(), 
                                        String.valueOf(timestamp), hostName, error }); 
         }
       }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Apr 30 05:25:05 2008
@@ -23,9 +23,9 @@
 import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
@@ -42,7 +42,8 @@
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.net.*;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.Node;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -126,7 +127,7 @@
   boolean tasksInited = false;
 
   private LocalFileSystem localFs;
-  private String jobId;
+  private JobID jobId;
   private boolean hasSpeculativeMaps;
   private boolean hasSpeculativeReduces;
 
@@ -148,14 +149,14 @@
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
   
   // Map of mapTaskId -> no. of fetch failures
-  private Map<String, Integer> mapTaskIdToFetchFailuresMap =
-    new TreeMap<String, Integer>();
+  private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
+    new TreeMap<TaskAttemptID, Integer>();
   
   /**
    * Create a JobInProgress with the given job file, plus a handle
    * to the tracker.
    */
-  public JobInProgress(String jobid, JobTracker jobtracker, 
+  public JobInProgress(JobID jobid, JobTracker jobtracker, 
                        JobConf default_conf) throws IOException {
     this.jobId = jobid;
     String url = "http://" + jobtracker.getJobTrackerMachine() + ":" 
@@ -199,7 +200,7 @@
     this.jobMetrics.setTag("user", conf.getUser());
     this.jobMetrics.setTag("sessionId", conf.getSessionId());
     this.jobMetrics.setTag("jobName", conf.getJobName());
-    this.jobMetrics.setTag("jobId", jobid);
+    this.jobMetrics.setTag("jobId", jobid.toString());
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
     this.maxLevel = jobtracker.getNumTaskCacheLevels();
@@ -218,10 +219,9 @@
     Counters counters = getCounters();
     for (Counters.Group group : counters) {
       jobMetrics.setTag("group", group.getDisplayName());
-          
       for (Counters.Counter counter : group) {
         jobMetrics.setTag("counter", counter.getDisplayName());
-        jobMetrics.setMetric("value", (float) counter.getCounter());
+        jobMetrics.setMetric("value", counter.getCounter());
         jobMetrics.update();
       }
     }
@@ -329,9 +329,9 @@
       status.setReduceProgress(1.0f);
       status.setRunState(JobStatus.SUCCEEDED);
       tasksInited = true;
-      JobHistory.JobInfo.logStarted(profile.getJobId(), 
+      JobHistory.JobInfo.logStarted(profile.getJobID(), 
                                     System.currentTimeMillis(), 0, 0);
-      JobHistory.JobInfo.logFinished(profile.getJobId(), 
+      JobHistory.JobInfo.logFinished(profile.getJobID(), 
                                      System.currentTimeMillis(), 0, 0, 0, 0,
                                      getCounters());
       // Special case because the Job is not queued
@@ -361,10 +361,10 @@
       }
     }
 
-    this.status = new JobStatus(status.getJobId(), 0.0f, 0.0f, JobStatus.RUNNING);
+    this.status = new JobStatus(status.getJobID(), 0.0f, 0.0f, JobStatus.RUNNING);
     tasksInited = true;
         
-    JobHistory.JobInfo.logStarted(profile.getJobId(), System.currentTimeMillis(), numMapTasks, numReduceTasks);
+    JobHistory.JobInfo.logStarted(profile.getJobID(), System.currentTimeMillis(), numMapTasks, numReduceTasks);
   }
 
   /////////////////////////////////////////////////////
@@ -482,7 +482,7 @@
 
       if (state == TaskStatus.State.COMMIT_PENDING) {
         JobWithTaskContext j = new JobWithTaskContext(this, tip, 
-                                                      status.getTaskId(),
+                                                      status.getTaskID(),
                                                       metrics);
         jobtracker.addToCommitQueue(j);
       }
@@ -496,14 +496,14 @@
         }
         httpTaskLogLocation = "http://" + host + ":" + 
           ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
-          status.getTaskId();
+          status.getTaskID();
       }
 
       TaskCompletionEvent taskEvent = null;
       if (state == TaskStatus.State.SUCCEEDED) {
         taskEvent = new TaskCompletionEvent(
                                             taskCompletionEventTracker, 
-                                            status.getTaskId(),
+                                            status.getTaskID(),
                                             tip.idWithinJob(),
                                             status.getIsMap(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
@@ -527,12 +527,12 @@
         if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
           TaskCompletionEvent t = 
             this.taskCompletionEvents.get(eventNumber);
-          if (t.getTaskId().equals(status.getTaskId()))
+          if (t.getTaskID().equals(status.getTaskID()))
             t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
         }
         
         // Tell the job to fail the relevant task
-        failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
+        failedTask(tip, status.getTaskID(), status, status.getTaskTracker(),
                    wasRunning, wasComplete, metrics);
 
         // Did the task failure lead to tip failure?
@@ -544,7 +544,7 @@
           taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
         }
         taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
-                                            status.getTaskId(),
+                                            status.getTaskID(),
                                             tip.idWithinJob(),
                                             status.getIsMap(),
                                             taskCompletionStatus, 
@@ -651,11 +651,10 @@
                                             int clusterSize
                                            ) throws IOException {
     if (!tasksInited) {
-      LOG.info("Cannot create task split for " + profile.getJobId());
+      LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
-    
-    
+        
     int target = findNewMapTask(tts, clusterSize, status.mapProgress());
     if (target == -1) {
       return null;
@@ -664,11 +663,9 @@
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       runningMapTasks += 1;
-
       boolean wasRunning = maps[target].isRunning();
       if (!wasRunning) {
-        JobHistory.Task.logStarted(profile.getJobId(), 
-                                   maps[target].getTIPId(), Values.MAP.name(),
+        JobHistory.Task.logStarted(maps[target].getTIPId(), Values.MAP.name(),
                                    System.currentTimeMillis());
       }
 
@@ -687,7 +684,7 @@
                                                int clusterSize
                                               ) throws IOException {
     if (!tasksInited) {
-      LOG.info("Cannot create task split for " + profile.getJobId());
+      LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
 
@@ -699,11 +696,9 @@
     Task result = reduces[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       runningReduceTasks += 1;
-
       boolean wasRunning = reduces[target].isRunning();
       if (!wasRunning) {
-        JobHistory.Task.logStarted(profile.getJobId(), 
-                                   reduces[target].getTIPId(), Values.REDUCE.name(),
+        JobHistory.Task.logStarted(reduces[target].getTIPId(), Values.REDUCE.name(),
                                    System.currentTimeMillis());
       }
 
@@ -1286,7 +1281,7 @@
                                          TaskStatus status,
                                          JobTrackerMetrics metrics) 
   {
-    String taskid = status.getTaskId();
+    TaskAttemptID taskid = status.getTaskID();
         
     // Sanity check: is the TIP already complete? 
     // It _is_ safe to not decrement running{Map|Reduce}Tasks and
@@ -1314,24 +1309,20 @@
     // Update jobhistory 
     String taskTrackerName = status.getTaskTracker();
     if (status.getIsMap()){
-      JobHistory.MapAttempt.logStarted(profile.getJobId(), 
-                                       tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+      JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                                        taskTrackerName); 
-      JobHistory.MapAttempt.logFinished(profile.getJobId(), 
-                                        tip.getTIPId(), status.getTaskId(), status.getFinishTime(), 
+      JobHistory.MapAttempt.logFinished(status.getTaskID(), status.getFinishTime(), 
                                         taskTrackerName); 
-      JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
+      JobHistory.Task.logFinished(tip.getTIPId(), 
                                   Values.MAP.name(), status.getFinishTime(),
                                   status.getCounters()); 
     }else{
-      JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
-                                          tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+      JobHistory.ReduceAttempt.logStarted( status.getTaskID(), status.getStartTime(), 
                                           taskTrackerName); 
-      JobHistory.ReduceAttempt.logFinished(profile.getJobId(), 
-                                           tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
+      JobHistory.ReduceAttempt.logFinished(status.getTaskID(), status.getShuffleFinishTime(),
                                            status.getSortFinishTime(), status.getFinishTime(), 
                                            taskTrackerName); 
-      JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
+      JobHistory.Task.logFinished(tip.getTIPId(), 
                                   Values.REDUCE.name(), status.getFinishTime(),
                                   status.getCounters()); 
     }
@@ -1394,9 +1385,9 @@
       this.status.setReduceProgress(1.0f);
       this.finishTime = System.currentTimeMillis();
       garbageCollect();
-      LOG.info("Job " + this.status.getJobId() + 
+      LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
-      JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, 
+      JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks, failedMapTasks, 
                                      failedReduceTasks, getCounters());
@@ -1411,8 +1402,8 @@
    */
   public synchronized void kill() {
     if (status.getRunState() != JobStatus.FAILED) {
-      LOG.info("Killing job '" + this.status.getJobId() + "'");
-      this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);
+      LOG.info("Killing job '" + this.status.getJobID() + "'");
+      this.status = new JobStatus(status.getJobID(), 1.0f, 1.0f, JobStatus.FAILED);
       this.finishTime = System.currentTimeMillis();
       this.runningMapTasks = 0;
       this.runningReduceTasks = 0;
@@ -1425,7 +1416,7 @@
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
-      JobHistory.JobInfo.logFailed(this.status.getJobId(), finishTime, 
+      JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
                                    this.finishedMapTasks, this.finishedReduceTasks);
       garbageCollect();
     }
@@ -1443,7 +1434,7 @@
    * we need to schedule reexecution so that downstream reduce tasks can 
    * obtain the map task's output.
    */
-  private void failedTask(TaskInProgress tip, String taskid, 
+  private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
                           TaskStatus status, String trackerName,
                           boolean wasRunning, boolean wasComplete,
                           JobTrackerMetrics metrics) {
@@ -1495,29 +1486,23 @@
     // update job history
     String taskTrackerName = status.getTaskTracker();
     if (status.getIsMap()) {
-      JobHistory.MapAttempt.logStarted(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+      JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                 taskTrackerName);
       if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.MapAttempt.logFailed(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
+        JobHistory.MapAttempt.logFailed(status.getTaskID(), System.currentTimeMillis(),
                 taskTrackerName, status.getDiagnosticInfo());
       } else {
-        JobHistory.MapAttempt.logKilled(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
+        JobHistory.MapAttempt.logKilled(status.getTaskID(), System.currentTimeMillis(),
                 taskTrackerName, status.getDiagnosticInfo());
       }
     } else {
-      JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+      JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
                 taskTrackerName);
       if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.ReduceAttempt.logFailed(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
+        JobHistory.ReduceAttempt.logFailed(status.getTaskID(), System.currentTimeMillis(),
                 taskTrackerName, status.getDiagnosticInfo());
       } else {
-        JobHistory.ReduceAttempt.logKilled(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), System.currentTimeMillis(),
+        JobHistory.ReduceAttempt.logKilled(status.getTaskID(), System.currentTimeMillis(),
                 taskTrackerName, status.getDiagnosticInfo());
       }
     }
@@ -1558,14 +1543,14 @@
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
       
       if (killJob) {
-        LOG.info("Aborting job " + profile.getJobId());
-        JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), 
+        LOG.info("Aborting job " + profile.getJobID());
+        JobHistory.Task.logFailed(tip.getTIPId(), 
                                   tip.isMapTask() ? 
                                           Values.MAP.name() : 
                                           Values.REDUCE.name(),  
                                   System.currentTimeMillis(), 
                                   status.getDiagnosticInfo());
-        JobHistory.JobInfo.logFailed(profile.getJobId(), 
+        JobHistory.JobInfo.logFailed(profile.getJobID(), 
                                      System.currentTimeMillis(), 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks
@@ -1593,7 +1578,7 @@
    * @param reason The reason that the task failed
    * @param trackerName The task tracker the task failed on
    */
-  public void failedTask(TaskInProgress tip, String taskid, String reason, 
+  public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, 
                          TaskStatus.Phase phase, TaskStatus.State state, 
                          String trackerName, JobTrackerMetrics metrics) {
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
@@ -1605,7 +1590,7 @@
                                                     trackerName, phase,
                                                     null);
     updateTaskStatus(tip, status, metrics);
-    JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), 
+    JobHistory.Task.logFailed(tip.getTIPId(), 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               System.currentTimeMillis(), reason); 
   }
@@ -1638,7 +1623,7 @@
         
       // Delete temp dfs dirs created if any, like in case of 
       // speculative exn of reduces.  
-      Path tempDir = new Path(conf.getSystemDir(), jobId); 
+      Path tempDir = new Path(conf.getSystemDir(), jobId.toString()); 
       fs.delete(tempDir, true); 
 
       // delete the temporary directory in output directory
@@ -1651,7 +1636,7 @@
         }
       }
     } catch (IOException e) {
-      LOG.warn("Error cleaning up "+profile.getJobId()+": "+e);
+      LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
     }
     
     cleanUpMetrics();
@@ -1665,8 +1650,8 @@
   /**
    * Return the TaskInProgress that matches the tipid.
    */
-  public TaskInProgress getTaskInProgress(String tipid){
-    if (TaskInProgress.isMapId(tipid)) {
+  public TaskInProgress getTaskInProgress(TaskID tipid){
+    if (tipid.isMap()) {
       for (int i = 0; i < maps.length; i++) {
         if (tipid.equals(maps[i].getTIPId())){
           return maps[i];
@@ -1712,7 +1697,7 @@
   }
   
   synchronized void fetchFailureNotification(TaskInProgress tip, 
-                                             String mapTaskId, 
+                                             TaskAttemptID mapTaskId, 
                                              String trackerName, 
                                              JobTrackerMetrics metrics) {
     Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
@@ -1743,10 +1728,10 @@
   static class JobWithTaskContext {
     private JobInProgress job;
     private TaskInProgress tip;
-    private String taskId;
+    private TaskAttemptID taskId;
     private JobTrackerMetrics metrics;
     JobWithTaskContext(JobInProgress job, TaskInProgress tip, 
-        String taskId, JobTrackerMetrics metrics) {
+        TaskAttemptID taskId, JobTrackerMetrics metrics) {
       this.job = job;
       this.tip = tip;
       this.taskId = taskId;
@@ -1758,7 +1743,7 @@
     TaskInProgress getTIP() {
       return tip;
     }
-    String getTaskId() {
+    TaskAttemptID getTaskID() {
       return taskId;
     }
     JobTrackerMetrics getJobTrackerMetrics() {

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobProfile.java Wed Apr 30 05:25:05 2008
@@ -17,10 +17,15 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.io.*;
-
-import java.io.*;
-import java.net.*;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
 
 /**************************************************
  * A JobProfile is a MapReduce primitive.  Tracks a job,
@@ -38,7 +43,7 @@
   }
 
   String user;
-  String jobid;
+  JobID jobid;
   String jobFile;
   String url;
   String name;
@@ -59,7 +64,7 @@
    * @param url link to the web-ui for details of the job.
    * @param name user-specified job name.
    */
-  public JobProfile(String user, String jobid, String jobFile, String url,
+  public JobProfile(String user, JobID jobid, String jobFile, String url,
                     String name) {
     this.user = user;
     this.jobid = jobid;
@@ -78,7 +83,7 @@
   /**
    * Get the job id.
    */
-  public String getJobId() {
+  public JobID getJobID() {
     return jobid;
   }
 
@@ -111,18 +116,18 @@
   // Writable
   ///////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, jobid);
-    UTF8.writeString(out, jobFile);
-    UTF8.writeString(out, url);
-    UTF8.writeString(out, user);
-    UTF8.writeString(out, name);
+    jobid.write(out);
+    Text.writeString(out, jobFile);
+    Text.writeString(out, url);
+    Text.writeString(out, user);
+    Text.writeString(out, name);
   }
   public void readFields(DataInput in) throws IOException {
-    this.jobid = UTF8.readString(in);
-    this.jobFile = UTF8.readString(in);
-    this.url = UTF8.readString(in);
-    this.user = UTF8.readString(in);
-    this.name = UTF8.readString(in);
+    this.jobid = JobID.read(in);
+    this.jobFile = Text.readString(in);
+    this.url = Text.readString(in);
+    this.user = Text.readString(in);
+    this.name = Text.readString(in);
   }
 }
 

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobStatus.java Wed Apr 30 05:25:05 2008
@@ -21,7 +21,7 @@
 import java.io.DataOutput;
 import java.io.IOException;
 
-import org.apache.hadoop.io.UTF8;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
@@ -46,7 +46,7 @@
   public static final int FAILED = 3;
   public static final int PREP = 4;
 
-  private String jobid;
+  private JobID jobid;
   private float mapProgress;
   private float reduceProgress;
   private int runState;
@@ -65,7 +65,7 @@
    * @param reduceProgress The progress made on the reduces
    * @param runState The current state of the job
    */
-  public JobStatus(String jobid, float mapProgress, float reduceProgress, int runState) {
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress, int runState) {
     this.jobid = jobid;
     this.mapProgress = mapProgress;
     this.reduceProgress = reduceProgress;
@@ -76,7 +76,7 @@
   /**
    * @return The jobid of the Job
    */
-  public String getJobId() { return jobid; }
+  public JobID getJobID() { return jobid; }
     
   /**
    * @return Percentage of progress in maps 
@@ -141,20 +141,20 @@
   // Writable
   ///////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, jobid);
+    jobid.write(out);
     out.writeFloat(mapProgress);
     out.writeFloat(reduceProgress);
     out.writeInt(runState);
     out.writeLong(startTime);
-    UTF8.writeString(out, user);
+    Text.writeString(out, user);
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.jobid = UTF8.readString(in);
+    this.jobid = JobID.read(in);
     this.mapProgress = in.readFloat();
     this.reduceProgress = in.readFloat();
     this.runState = in.readInt();
     this.startTime = in.readLong();
-    this.user = UTF8.readString(in);
+    this.user = Text.readString(in);
   }
 }

Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=652364&r1=652363&r2=652364&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobSubmissionProtocol.java Wed Apr 30 05:25:05 2008
@@ -28,7 +28,7 @@
  * the current system status.
  */ 
 public interface JobSubmissionProtocol extends VersionedProtocol {
-  /*
+  /* 
    *Changing the versionID to 2L since the getTaskCompletionEvents method has
    *changed.
    *Changed to 4 since killTask(String,boolean) is added
@@ -37,22 +37,23 @@
    * max_map_tasks and max_reduce_tasks for HADOOP-1274
    * Version 6: change the counters representation for HADOOP-2248
    * Version 7: added getAllJobs for HADOOP-2487
+   * Version 8: change {job|task}id's to use corresponding objects rather that strings.
    */
-  public static final long versionID = 7L;
+  public static final long versionID = 8L;
 
   /**
    * Allocate a name for the job.
    * @return a unique job name for submitting jobs.
    * @throws IOException
    */
-  public String getNewJobId() throws IOException;
+  public JobID getNewJobId() throws IOException;
 
   /**
    * Submit a Job for execution.  Returns the latest profile for
    * that job.
    * The job files should be submitted in <b>system-dir</b>/<b>jobName</b>.
    */
-  public JobStatus submitJob(String jobName) throws IOException;
+  public JobStatus submitJob(JobID jobName) throws IOException;
 
   /**
    * Get the current status of the cluster
@@ -63,7 +64,7 @@
   /**
    * Kill the indicated job
    */
-  public void killJob(String jobid) throws IOException;
+  public void killJob(JobID jobid) throws IOException;
 
   /**
    * Kill indicated task attempt.
@@ -71,34 +72,34 @@
    * @param shouldFail if true the task is failed and added to failed tasks list, otherwise
    * it is just killed, w/o affecting job failure status.  
    */ 
-  public boolean killTask(String taskId, boolean shouldFail) throws IOException;
+  public boolean killTask(TaskAttemptID taskId, boolean shouldFail) throws IOException;
   
   /**
    * Grab a handle to a job that is already known to the JobTracker.
    * @return Profile of the job, or null if not found. 
    */
-  public JobProfile getJobProfile(String jobid) throws IOException;
+  public JobProfile getJobProfile(JobID jobid) throws IOException;
 
   /**
    * Grab a handle to a job that is already known to the JobTracker.
    * @return Status of the job, or null if not found.
    */
-  public JobStatus getJobStatus(String jobid) throws IOException;
+  public JobStatus getJobStatus(JobID jobid) throws IOException;
 
   /**
    * Grab the current job counters
    */
-  public Counters getJobCounters(String jobid) throws IOException;
+  public Counters getJobCounters(JobID jobid) throws IOException;
     
   /**
    * Grab a bunch of info on the map tasks that make up the job
    */
-  public TaskReport[] getMapTaskReports(String jobid) throws IOException;
+  public TaskReport[] getMapTaskReports(JobID jobid) throws IOException;
 
   /**
    * Grab a bunch of info on the reduce tasks that make up the job
    */
-  public TaskReport[] getReduceTaskReports(String jobid) throws IOException;
+  public TaskReport[] getReduceTaskReports(JobID jobid) throws IOException;
 
   /**
    * A MapReduce system always operates on a single filesystem.  This 
@@ -130,15 +131,15 @@
    * @return array of task completion events. 
    * @throws IOException
    */
-  public TaskCompletionEvent[] getTaskCompletionEvents(
-                                                       String jobid, int fromEventId, int maxEvents) throws IOException;
+  public TaskCompletionEvent[] getTaskCompletionEvents(JobID jobid
+      , int fromEventId, int maxEvents) throws IOException;
     
   /**
    * Get the diagnostics for a given task in a given job
-   * @param jobId the id of the job
+   * @param taskId the id of the task
    * @return an array of the diagnostic messages
    */
-  public String[] getTaskDiagnostics(String jobId, String tipId, String taskId) 
+  public String[] getTaskDiagnostics(TaskAttemptID taskId) 
   throws IOException;  
   
 }



Mime
View raw message