hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1196458 [6/19] - in /hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project: ./ assembly/ bin/ conf/ dev-support/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-cli...
Date Wed, 02 Nov 2011 05:35:03 GMT
Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Wed Nov  2 05:34:31 2011
@@ -16,18 +16,19 @@
   <parent>
     <artifactId>hadoop-mapreduce-client</artifactId>
     <groupId>org.apache.hadoop</groupId>
-    <version>${hadoop-mapreduce.version}</version>
+    <version>0.24.0-SNAPSHOT</version>
   </parent>
   <modelVersion>4.0.0</modelVersion>
   <groupId>org.apache.hadoop</groupId>
   <artifactId>hadoop-mapreduce-client-core</artifactId>
+  <version>0.24.0-SNAPSHOT</version>
   <name>hadoop-mapreduce-client-core</name>
 
   <properties>
-    <install.file>${project.artifact.file}</install.file>
-    <mr.basedir>${project.parent.parent.basedir}</mr.basedir>
+    <!-- Needed for generating FindBugs warnings using parent pom -->
+    <mr.basedir>${project.parent.basedir}/..</mr.basedir>
   </properties>
-  
+
   <dependencies>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -35,6 +36,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-nodemanager</artifactId> 
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>
     </dependency>
   </dependencies>
@@ -42,6 +47,15 @@
   <build>
     <plugins>
       <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+	<configuration>
+          <systemPropertyVariables>
+            <log4j.configuration>file:///${project.parent.basedir}/../src/test/log4j.properties</log4j.configuration>
+          </systemPropertyVariables>
+        </configuration>
+      </plugin>
+      <plugin>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro-maven-plugin</artifactId>
         <version>1.5.3</version>
@@ -54,6 +68,24 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-antrun-plugin</artifactId>
+        <executions>
+          <execution>
+            <phase>pre-site</phase>
+            <goals>
+              <goal>run</goal>
+            </goals>
+            <configuration>
+              <tasks>
+                <copy file="src/main/resources/mapred-default.xml" todir="src/site/resources"/>
+                <copy file="../../../hadoop-common-project/hadoop-common/src/main/xsl/configuration.xsl" todir="src/site/resources"/>
+              </tasks>
+            </configuration>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Wed Nov  2 05:34:31 2011
@@ -69,6 +69,17 @@
       ]
      },
 
+     {"type": "record", "name": "AMStarted",
+      "fields": [
+          {"name": "applicationAttemptId", "type": "string"},
+          {"name": "startTime", "type": "long"},
+          {"name": "containerId", "type": "string"},
+          {"name": "nodeManagerHost", "type": "string"},
+          {"name": "nodeManagerPort", "type": "int"},
+          {"name": "nodeManagerHttpPort", "type": "int"}
+      ]
+     },
+
      {"type": "record", "name": "JobSubmitted",
       "fields": [
           {"name": "jobid", "type": "string"},
@@ -125,6 +136,7 @@
           {"name": "mapFinishTime", "type": "long"},
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
+          {"name": "rackname", "type": "string"},
           {"name": "state", "type": "string"},
           {"name": "counters", "type": "JhCounters"},
           {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
@@ -144,6 +156,7 @@
           {"name": "sortFinishTime", "type": "long"},
           {"name": "finishTime", "type": "long"},
           {"name": "hostname", "type": "string"},
+          {"name": "rackname", "type": "string"},
           {"name": "state", "type": "string"},
           {"name": "counters", "type": "JhCounters"},
           {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
@@ -173,7 +186,9 @@
           {"name": "attemptId", "type": "string"},
           {"name": "startTime", "type": "long"},
           {"name": "trackerName", "type": "string"},
-          {"name": "httpPort", "type": "int"}
+          {"name": "httpPort", "type": "int"},
+          {"name": "shufflePort", "type": "int"},
+          {"name": "containerId", "type": "string"}
       ]
      },
 
@@ -213,7 +228,7 @@
           {"name": "counters", "type": "JhCounters"}
       ]
      },
-
+     	
      {"type": "record", "name": "TaskStarted",
       "fields": [
           {"name": "taskid", "type": "string"},
@@ -244,6 +259,7 @@
           "TASK_FINISHED",
           "TASK_FAILED",
           "TASK_UPDATED",
+          "NORMALIZED_RESOURCE",
           "MAP_ATTEMPT_STARTED",
           "MAP_ATTEMPT_FINISHED",
           "MAP_ATTEMPT_FAILED",
@@ -259,7 +275,8 @@
           "CLEANUP_ATTEMPT_STARTED",
           "CLEANUP_ATTEMPT_FINISHED",
           "CLEANUP_ATTEMPT_FAILED",
-          "CLEANUP_ATTEMPT_KILLED"
+          "CLEANUP_ATTEMPT_KILLED",
+          "AM_STARTED"
           ]
      },
 
@@ -271,6 +288,7 @@
                "JobFinished",
                "JobInfoChange",
                "JobInited",
+               "AMStarted",
                "JobPriorityChange",
                "JobStatusChanged",
                "JobSubmitted",

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Wed Nov  2 05:34:31 2011
@@ -38,7 +38,8 @@ public class FileOutputCommitter extends
 
   public static final Log LOG = LogFactory.getLog(
       "org.apache.hadoop.mapred.FileOutputCommitter");
-/**
+  
+  /**
    * Temporary directory name 
    */
   public static final String TEMP_DIR_NAME = "_temporary";
@@ -50,7 +51,9 @@ public class FileOutputCommitter extends
     JobConf conf = context.getJobConf();
     Path outputPath = FileOutputFormat.getOutputPath(conf);
     if (outputPath != null) {
-      Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
+      Path tmpDir = 
+          new Path(outputPath, getJobAttemptBaseDirName(context) + 
+              Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
       FileSystem fileSys = tmpDir.getFileSystem(conf);
       if (!fileSys.mkdirs(tmpDir)) {
         LOG.error("Mkdirs failed to create " + tmpDir.toString());
@@ -65,12 +68,33 @@ public class FileOutputCommitter extends
   }
   
   public void commitJob(JobContext context) throws IOException {
-    // delete the _temporary folder in the output folder
-    cleanupJob(context);
-    // check if the output-dir marking is required
-    if (shouldMarkOutputDir(context.getJobConf())) {
-      // create a _success file in the output folder
-      markOutputDirSuccessful(context);
+    //delete the task temp directory from the current jobtempdir
+    JobConf conf = context.getJobConf();
+    Path outputPath = FileOutputFormat.getOutputPath(conf);
+    if (outputPath != null) {
+      FileSystem outputFileSystem = outputPath.getFileSystem(conf);
+      Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
+          Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
+      FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
+      if (fileSys.exists(tmpDir)) {
+        fileSys.delete(tmpDir, true);
+      } else {
+        LOG.warn("Task temp dir could not be deleted " + tmpDir);
+      }
+
+      //move the job output to final place
+      Path jobOutputPath = 
+          new Path(outputPath, getJobAttemptBaseDirName(context));
+      moveJobOutputs(outputFileSystem, 
+          jobOutputPath, outputPath, jobOutputPath);
+
+      // delete the _temporary folder in the output folder
+      cleanupJob(context);
+      // check if the output-dir marking is required
+      if (shouldMarkOutputDir(context.getJobConf())) {
+        // create a _success file in the output folder
+        markOutputDirSuccessful(context);
+      }
     }
   }
   
@@ -88,6 +112,39 @@ public class FileOutputCommitter extends
     }
   }
 
+  private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
+      Path finalOutputDir, Path jobOutput) throws IOException {
+    LOG.debug("Told to move job output from " + jobOutput
+        + " to " + finalOutputDir + 
+        " and orig job output path is " + origJobOutputPath);  
+    if (fs.isFile(jobOutput)) {
+      Path finalOutputPath = 
+          getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
+      if (!fs.rename(jobOutput, finalOutputPath)) {
+        if (!fs.delete(finalOutputPath, true)) {
+          throw new IOException("Failed to delete earlier output of job");
+        }
+        if (!fs.rename(jobOutput, finalOutputPath)) {
+          throw new IOException("Failed to save output of job");
+        }
+      }
+      LOG.debug("Moved job output file from " + jobOutput + " to " + 
+          finalOutputPath);
+    } else if (fs.getFileStatus(jobOutput).isDirectory()) {
+      LOG.debug("Job output file " + jobOutput + " is a dir");      
+      FileStatus[] paths = fs.listStatus(jobOutput);
+      Path finalOutputPath = 
+          getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
+      fs.mkdirs(finalOutputPath);
+      LOG.debug("Creating dirs along job output path " + finalOutputPath);
+      if (paths != null) {
+        for (FileStatus path : paths) {
+          moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
+        }
+      }
+    }
+  }
+  
   @Override
   @Deprecated
   public void cleanupJob(JobContext context) throws IOException {
@@ -128,9 +185,14 @@ public class FileOutputCommitter extends
       FileSystem fs = taskOutputPath.getFileSystem(job);
       context.getProgressible().progress();
       if (fs.exists(taskOutputPath)) {
-        Path jobOutputPath = taskOutputPath.getParent().getParent();
-        // Move the task outputs to their final place
-        moveTaskOutputs(context, fs, jobOutputPath, taskOutputPath);
+        // Move the task outputs to the current job attempt output dir
+        JobConf conf = context.getJobConf();
+        Path outputPath = FileOutputFormat.getOutputPath(conf);
+        FileSystem outputFileSystem = outputPath.getFileSystem(conf);
+        Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
+        moveTaskOutputs(context, outputFileSystem, jobOutputPath, 
+            taskOutputPath);
+
         // Delete the temporary task-specific output directory
         if (!fs.delete(taskOutputPath, true)) {
           LOG.info("Failed to delete the temporary output" + 
@@ -149,8 +211,10 @@ public class FileOutputCommitter extends
   throws IOException {
     TaskAttemptID attemptId = context.getTaskAttemptID();
     context.getProgressible().progress();
+    LOG.debug("Told to move taskoutput from " + taskOutput
+        + " to " + jobOutputDir);    
     if (fs.isFile(taskOutput)) {
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+      Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, 
                                           getTempTaskOutputPath(context));
       if (!fs.rename(taskOutput, finalOutputPath)) {
         if (!fs.delete(finalOutputPath, true)) {
@@ -164,10 +228,12 @@ public class FileOutputCommitter extends
       }
       LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
     } else if(fs.getFileStatus(taskOutput).isDirectory()) {
+      LOG.debug("Taskoutput " + taskOutput + " is a dir");
       FileStatus[] paths = fs.listStatus(taskOutput);
-      Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput, 
+      Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput, 
 	          getTempTaskOutputPath(context));
       fs.mkdirs(finalOutputPath);
+      LOG.debug("Creating dirs along path " + finalOutputPath);
       if (paths != null) {
         for (FileStatus path : paths) {
           moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
@@ -185,13 +251,16 @@ public class FileOutputCommitter extends
     }
   }
 
-  private Path getFinalPath(Path jobOutputDir, Path taskOutput, 
+  @SuppressWarnings("deprecation")
+  private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput, 
                             Path taskOutputPath) throws IOException {
-    URI taskOutputUri = taskOutput.toUri();
-    URI relativePath = taskOutputPath.toUri().relativize(taskOutputUri);
-    if (taskOutputUri == relativePath) {//taskOutputPath is not a parent of taskOutput
+    URI taskOutputUri = taskOutput.makeQualified(fs).toUri();
+    URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri();
+    URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
+    if (taskOutputUri == relativePath) { 
+      //taskOutputPath is not a parent of taskOutput
       throw new IOException("Can not get the relative path: base = " + 
-          taskOutputPath + " child = " + taskOutput);
+          taskOutputPathUri + " child = " + taskOutputUri);
     }
     if (relativePath.getPath().length() > 0) {
       return new Path(jobOutputDir, relativePath.getPath());
@@ -216,7 +285,8 @@ public class FileOutputCommitter extends
     return false;
   }
 
-  Path getTempTaskOutputPath(TaskAttemptContext taskContext) throws IOException {
+  Path getTempTaskOutputPath(TaskAttemptContext taskContext) 
+      throws IOException {
     JobConf conf = taskContext.getJobConf();
     Path outputPath = FileOutputFormat.getOutputPath(conf);
     if (outputPath != null) {
@@ -247,4 +317,63 @@ public class FileOutputCommitter extends
     }
     return taskTmpDir;
   }
+  
+  @Override
+  public boolean isRecoverySupported() {
+    return true;
+  }
+  
+  @Override
+  public void recoverTask(TaskAttemptContext context)
+      throws IOException {
+    Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf());
+    context.progress();
+    Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
+    int previousAttempt =         
+        context.getConfiguration().getInt(
+            MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1;
+    if (previousAttempt < 0) {
+      LOG.warn("Cannot recover task output for first attempt...");
+      return;
+    }
+
+    FileSystem outputFileSystem = 
+        outputPath.getFileSystem(context.getJobConf());
+    Path pathToRecover = 
+        new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
+    if (outputFileSystem.exists(pathToRecover)) {
+      // Move the task outputs to their final place
+      LOG.debug("Trying to recover task from " + pathToRecover
+          + " into " + jobOutputPath);
+      moveJobOutputs(outputFileSystem, 
+          pathToRecover, jobOutputPath, pathToRecover);
+      LOG.info("Saved output of job to " + jobOutputPath);
+    }
+  }
+
+  protected static String getJobAttemptBaseDirName(JobContext context) {
+    int appAttemptId = 
+        context.getJobConf().getInt(
+            MRConstants.APPLICATION_ATTEMPT_ID, 0);
+    return getJobAttemptBaseDirName(appAttemptId);
+  }
+
+  protected static String getJobTempDirName(TaskAttemptContext context) {
+    int appAttemptId = 
+        context.getJobConf().getInt(
+            MRConstants.APPLICATION_ATTEMPT_ID, 0);
+    return getJobAttemptBaseDirName(appAttemptId);
+  }
+
+  protected static String getJobAttemptBaseDirName(int appAttemptId) {
+    return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR + 
+      + appAttemptId;
+  }
+
+  protected static String getTaskAttemptBaseDirName(
+      TaskAttemptContext context) {
+    return getJobTempDirName(context) + Path.SEPARATOR + 
+      FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+      "_" + context.getTaskAttemptID().toString();
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Wed Nov  2 05:34:31 2011
@@ -43,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -459,6 +460,37 @@ public class JobClient extends CLI {
     cluster = new Cluster(conf);
   }
 
+  @InterfaceAudience.Private
+  public static class Renewer extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return DelegationTokenIdentifier.MAPREDUCE_DELEGATION_KIND.equals(kind);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token, Configuration conf
+                      ) throws IOException, InterruptedException {
+      return new Cluster(conf).
+        renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token, Configuration conf
+                       ) throws IOException, InterruptedException {
+      new Cluster(conf).
+        cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+    
+  }
+
   /**
    * Build a job client, connect to the indicated job tracker.
    * 
@@ -1048,22 +1080,24 @@ public class JobClient extends CLI {
    * @return true if the renewal went well
    * @throws InvalidToken
    * @throws IOException
+   * @deprecated Use {@link Token#renew} instead
    */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token
                                    ) throws InvalidToken, IOException, 
                                             InterruptedException {
-    return cluster.renewDelegationToken(token);
+    return token.renew(getConf());
   }
 
   /**
    * Cancel a delegation token from the JobTracker
    * @param token the token to cancel
    * @throws IOException
+   * @deprecated Use {@link Token#cancel} instead
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                     ) throws InvalidToken, IOException, 
                                              InterruptedException {
-    cluster.cancelDelegationToken(token);
+    token.cancel(getConf());
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Wed Nov  2 05:34:31 2011
@@ -1649,7 +1649,7 @@ public class JobConf extends Configurati
    * @see #setJobEndNotificationURI(String)
    */
   public String getJobEndNotificationURI() {
-    return get(JobContext.END_NOTIFICATION_URL);
+    return get(JobContext.MR_JOB_END_NOTIFICATION_URL);
   }
 
   /**
@@ -1669,7 +1669,7 @@ public class JobConf extends Configurati
    *       JobCompletionAndChaining">Job Completion and Chaining</a>
    */
   public void setJobEndNotificationURI(String uri) {
-    set(JobContext.END_NOTIFICATION_URL, uri);
+    set(JobContext.MR_JOB_END_NOTIFICATION_URL, uri);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobEndNotifier.java Wed Nov  2 05:34:31 2011
@@ -102,8 +102,8 @@ public class JobEndNotifier {
     String uri = conf.getJobEndNotificationURI();
     if (uri != null) {
       // +1 to make logic for first notification identical to a retry
-      int retryAttempts = conf.getInt(JobContext.END_NOTIFICATION_RETRIES, 0) + 1;
-      long retryInterval = conf.getInt(JobContext.END_NOTIFICATION_RETRIE_INTERVAL, 30000);
+      int retryAttempts = conf.getInt(JobContext.MR_JOB_END_RETRY_ATTEMPTS, 0) + 1;
+      long retryInterval = conf.getInt(JobContext.MR_JOB_END_RETRY_INTERVAL, 30000);
       if (uri.contains("$jobId")) {
         uri = uri.replace("$jobId", status.getJobID().toString());
       }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MRConstants.java Wed Nov  2 05:34:31 2011
@@ -60,4 +60,9 @@ public interface MRConstants {
   
   /** Used in MRv1, mostly in TaskTracker code **/
   public static final String WORKDIR = "work";
+
+  /** Used on by MRv2 */
+  public static final String APPLICATION_ATTEMPT_ID =
+      "mapreduce.job.application.attempt.id";
+
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Wed Nov  2 05:34:31 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -36,8 +37,10 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.SequenceFile;
@@ -1727,10 +1730,10 @@ class MapTask extends Task {
         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
       }
       if (numSpills == 1) { //the spill is the final output
-        rfs.rename(filename[0],
+        sameVolRename(filename[0],
             mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
         if (indexCacheList.size() == 0) {
-          rfs.rename(mapOutputFile.getSpillIndexFile(0),
+          sameVolRename(mapOutputFile.getSpillIndexFile(0),
             mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
         } else {
           indexCacheList.get(0).writeToFile(
@@ -1847,7 +1850,29 @@ class MapTask extends Task {
         }
       }
     }
-
+    
+    /**
+     * Rename srcPath to dstPath on the same volume. This is the same
+     * as RawLocalFileSystem's rename method, except that it will not
+     * fall back to a copy, and it will create the target directory
+     * if it doesn't exist.
+     */
+    private void sameVolRename(Path srcPath,
+        Path dstPath) throws IOException {
+      RawLocalFileSystem rfs = (RawLocalFileSystem)this.rfs;
+      File src = rfs.pathToFile(srcPath);
+      File dst = rfs.pathToFile(dstPath);
+      if (!dst.getParentFile().exists()) {
+        if (!dst.getParentFile().mkdirs()) {
+          throw new IOException("Unable to rename " + src + " to "
+              + dst + ": couldn't create parent directory"); 
+        }
+      }
+      
+      if (!src.renameTo(dst)) {
+        throw new IOException("Unable to rename " + src + " to " + dst);
+      }
+    }
   } // MapOutputBuffer
   
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Master.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Master.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Master.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Master.java Wed Nov  2 05:34:31 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 
 public class Master {
   
@@ -33,20 +34,35 @@ public class Master {
   }
 
   public static String getMasterUserName(Configuration conf) {
-    return conf.get(MRConfig.MASTER_USER_NAME);
+    String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    if (framework.equals(MRConfig.CLASSIC_FRAMEWORK_NAME)) {    
+      return conf.get(MRConfig.MASTER_USER_NAME);
+    } 
+    else {
+      return conf.get(YarnConfiguration.RM_PRINCIPAL);
+    }
   }
   
   public static InetSocketAddress getMasterAddress(Configuration conf) {
-    String jobTrackerStr =
-      conf.get(MRConfig.MASTER_ADDRESS, "localhost:8012");
-    return NetUtils.createSocketAddr(jobTrackerStr);
+    String masterAddress;
+    String framework = conf.get(MRConfig.FRAMEWORK_NAME, MRConfig.YARN_FRAMEWORK_NAME);
+    if (framework.equals(MRConfig.CLASSIC_FRAMEWORK_NAME)) {
+      masterAddress = conf.get(MRConfig.MASTER_ADDRESS, "localhost:8012");
+      return NetUtils.createSocketAddr(masterAddress, 8012, MRConfig.MASTER_ADDRESS);
+    } 
+    else {
+      masterAddress = conf.get(YarnConfiguration.RM_ADDRESS,
+          YarnConfiguration.DEFAULT_RM_ADDRESS);
+      return NetUtils.createSocketAddr(masterAddress, YarnConfiguration.DEFAULT_RM_PORT,
+        YarnConfiguration.RM_ADDRESS);
+    }
   }
 
   public static String getMasterPrincipal(Configuration conf) 
   throws IOException {
-    String jtHostname = getMasterAddress(conf).getHostName();
-    // get jobtracker principal for use as delegation token renewer
-    return SecurityUtil.getServerPrincipal(getMasterUserName(conf), jtHostname);
+    String masterHostname = getMasterAddress(conf).getHostName();
+    // get kerberos principal for use as delegation token renewer
+    return SecurityUtil.getServerPrincipal(getMasterUserName(conf), masterHostname);
   }
   
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java Wed Nov  2 05:34:31 2011
@@ -152,6 +152,33 @@ public abstract class OutputCommitter 
    * is a bridge between the two.
    */
   @Override
+  public boolean isRecoverySupported() {
+    return false;
+  }
+
+  /**
+   * Recover the task output. 
+   * 
+   * The retry-count for the job will be passed via the 
+   * {@link MRConstants#APPLICATION_ATTEMPT_ID} key in  
+   * {@link TaskAttemptContext#getConfiguration()} for the 
+   * <code>OutputCommitter</code>.
+   * 
+   * If an exception is thrown the task will be attempted again. 
+   * 
+   * @param taskContext Context of the task whose output is being recovered
+   * @throws IOException
+   */
+  public void recoverTask(TaskAttemptContext taskContext) 
+  throws IOException {
+  }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
   public final void setupJob(org.apache.hadoop.mapreduce.JobContext jobContext
                              ) throws IOException {
     setupJob((JobContext) jobContext);
@@ -246,4 +273,17 @@ public abstract class OutputCommitter 
                  ) throws IOException {
     abortTask((TaskAttemptContext) taskContext);
   }
+  
+  /**
+   * This method implements the new interface by calling the old method. Note
+   * that the input types are different between the new and old apis and this
+   * is a bridge between the two.
+   */
+  @Override
+  public final 
+  void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext
+      ) throws IOException {
+    recoverTask((TaskAttemptContext) taskContext);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ReduceTask.java Wed Nov  2 05:34:31 2011
@@ -342,10 +342,14 @@ public class ReduceTask extends Task {
     RawKeyValueIterator rIter = null;
     
     boolean isLocal = false; 
-    // local iff framework == classic && master address == local
-    String framework = job.get(MRConfig.FRAMEWORK_NAME, MRConfig.CLASSIC_FRAMEWORK_NAME);
-    if (framework.equals(MRConfig.CLASSIC_FRAMEWORK_NAME)) {
-    	isLocal = "local".equals(job.get(MRConfig.MASTER_ADDRESS, "local"));        	
+    // local if
+    // 1) framework == local or
+    // 2) framework == null and job tracker address == local
+    String framework = job.get(MRConfig.FRAMEWORK_NAME);
+    String masterAddr = job.get(MRConfig.MASTER_ADDRESS, "local");
+    if ((framework == null && masterAddr.equals("local"))
+        || (framework != null && framework.equals(MRConfig.LOCAL_FRAMEWORK_NAME))) {
+      isLocal = true;
     }
     
     if (!isLocal) {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Wed Nov  2 05:34:31 2011
@@ -1119,7 +1119,7 @@ abstract public class Task implements Wr
     // delete the staging area for the job
     JobConf conf = new JobConf(jobContext.getConfiguration());
     if (!keepTaskFiles(conf)) {
-      String jobTempDir = conf.get("mapreduce.job.dir");
+      String jobTempDir = conf.get(MRJobConfig.MAPREDUCE_JOB_DIR);
       Path jobTempDirPath = new Path(jobTempDir);
       FileSystem fs = jobTempDirPath.getFileSystem(conf);
       fs.delete(jobTempDirPath, true);

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLogAppender.java Wed Nov  2 05:34:31 2011
@@ -93,7 +93,9 @@ public class TaskLogAppender extends Fil
   }
   
   public void flush() {
-    qw.flush();
+    if (qw != null) {
+      qw.flush();
+    }
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Cluster.java Wed Nov  2 05:34:31 2011
@@ -25,6 +25,8 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.ServiceLoader;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +40,7 @@ import org.apache.hadoop.mapreduce.proto
 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
 import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
+import org.apache.hadoop.mapreduce.v2.LogParams;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -62,7 +65,11 @@ public class Cluster {
   private Path sysDir = null;
   private Path stagingAreaDir = null;
   private Path jobHistoryDir = null;
+  private static final Log LOG = LogFactory.getLog(Cluster.class);
 
+  private static ServiceLoader<ClientProtocolProvider> frameworkLoader =
+      ServiceLoader.load(ClientProtocolProvider.class);
+  
   static {
     ConfigUtil.loadResources();
   }
@@ -81,19 +88,34 @@ public class Cluster {
   private void initialize(InetSocketAddress jobTrackAddr, Configuration conf)
       throws IOException {
 
-    for (ClientProtocolProvider provider : ServiceLoader
-        .load(ClientProtocolProvider.class)) {
-      ClientProtocol clientProtocol = null;
-      if (jobTrackAddr == null) {
-        clientProtocol = provider.create(conf);
-      } else {
-        clientProtocol = provider.create(jobTrackAddr, conf);
-      }
+    synchronized (frameworkLoader) {
+      for (ClientProtocolProvider provider : frameworkLoader) {
+        LOG.debug("Trying ClientProtocolProvider : "
+            + provider.getClass().getName());
+        ClientProtocol clientProtocol = null; 
+        try {
+          if (jobTrackAddr == null) {
+            clientProtocol = provider.create(conf);
+          } else {
+            clientProtocol = provider.create(jobTrackAddr, conf);
+          }
 
-      if (clientProtocol != null) {
-        clientProtocolProvider = provider;
-        client = clientProtocol;
-        break;
+          if (clientProtocol != null) {
+            clientProtocolProvider = provider;
+            client = clientProtocol;
+            LOG.debug("Picked " + provider.getClass().getName()
+                + " as the ClientProtocolProvider");
+            break;
+          }
+          else {
+            LOG.info("Cannot pick " + provider.getClass().getName()
+                + " as the ClientProtocolProvider - returned null protocol");
+          }
+        } 
+        catch (Exception e) {
+          LOG.info("Failed to use " + provider.getClass().getName()
+              + " due to error: " + e.getMessage());
+        }
       }
     }
 
@@ -191,7 +213,20 @@ public class Cluster {
       throws IOException, InterruptedException {
     return client.getQueue(name);
   }
-  
+
+  /**
+   * Get log parameters for the specified jobID or taskAttemptID
+   * @param jobID the job id.
+   * @param taskAttemptID the task attempt id. Optional.
+   * @return the LogParams
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public LogParams getLogParams(JobID jobID, TaskAttemptID taskAttemptID)
+      throws IOException, InterruptedException {
+    return client.getLogFileParams(jobID, taskAttemptID);
+  }
+
   /**
    * Get current cluster status.
    * 
@@ -371,6 +406,7 @@ public class Cluster {
    * @return the new expiration time
    * @throws InvalidToken
    * @throws IOException
+   * @deprecated Use {@link Token#renew} instead
    */
   public long renewDelegationToken(Token<DelegationTokenIdentifier> token
                                    ) throws InvalidToken, IOException,
@@ -387,6 +423,7 @@ public class Cluster {
    * Cancel a delegation token from the JobTracker
    * @param token the token to cancel
    * @throws IOException
+   * @deprecated Use {@link Token#cancel} instead
    */
   public void cancelDelegationToken(Token<DelegationTokenIdentifier> token
                                     ) throws IOException,

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/ContextFactory.java Wed Nov  2 05:34:31 2011
@@ -123,7 +123,7 @@ public class ContextFactory {
         WRAPPED_CONTEXT_FIELD = null;
       }
       MAP_CONTEXT_CONSTRUCTOR.setAccessible(true);
-      REPORTER_FIELD = taskIOContextCls.getDeclaredField("reporter");
+      REPORTER_FIELD = taskContextCls.getDeclaredField("reporter");
       REPORTER_FIELD.setAccessible(true);
       READER_FIELD = mapContextCls.getDeclaredField("reader");
       READER_FIELD.setAccessible(true);
@@ -141,7 +141,8 @@ public class ContextFactory {
   }
 
   /**
-   * Clone a job or task attempt context with a new configuration.
+   * Clone a {@link JobContext} or {@link TaskAttemptContext} with a 
+   * new configuration.
    * @param original the original context
    * @param conf the new configuration
    * @return a new context object
@@ -176,7 +177,8 @@ public class ContextFactory {
   }
   
   /**
-   * Copy a mapper context, optionally replacing the input and output.
+   * Copy a custom WrappedMapper.Context, optionally replacing 
+   * the input and output.
    * @param <K1> input key type
    * @param <V1> input value type
    * @param <K2> output key type

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java Wed Nov  2 05:34:31 2011
@@ -92,6 +92,11 @@ public class JobStatus implements Writab
   private boolean isRetired;
   private String historyFile = "";
   private String trackingUrl ="";
+  private int numUsedSlots;
+  private int numReservedSlots;
+  private int usedMem;
+  private int reservedMem;
+  private int neededMem;
 
     
   /**
@@ -487,6 +492,76 @@ public class JobStatus implements Writab
     return historyFile;
   }
 
+  /**
+   * @return number of used mapred slots
+   */
+  public int getNumUsedSlots() {
+    return numUsedSlots;
+  }
+
+  /**
+   * @param n number of used mapred slots
+   */
+  public void setNumUsedSlots(int n) {
+    numUsedSlots = n;
+  }
+
+  /**
+   * @return the number of reserved slots
+   */
+  public int getNumReservedSlots() {
+    return numReservedSlots;
+  }
+
+  /**
+   * @param n the number of reserved slots
+   */
+  public void setNumReservedSlots(int n) {
+    this.numReservedSlots = n;
+  }
+
+  /**
+   * @return the used memory
+   */
+  public int getUsedMem() {
+    return usedMem;
+  }
+
+  /**
+   * @param m the used memory
+   */
+  public void setUsedMem(int m) {
+    this.usedMem = m;
+  }
+
+  /**
+   * @return the reserved memory
+   */
+  public int getReservedMem() {
+    return reservedMem;
+ }
+
+  /**
+   * @param r the reserved memory
+   */
+  public void setReservedMem(int r) {
+    this.reservedMem = r;
+  }
+
+  /**
+   * @return the needed memory
+   */
+  public int getNeededMem() {
+  return neededMem;
+ }
+
+  /**
+   * @param n the needed memory
+   */
+  public void setNeededMem(int n) {
+    this.neededMem = n;
+  }
+
   public String toString() {
     StringBuffer buffer = new StringBuffer();
     buffer.append("job-id : " + jobid);
@@ -499,6 +574,11 @@ public class JobStatus implements Writab
     buffer.append("user-name : " + user);
     buffer.append("priority : " + priority);
     buffer.append("scheduling-info : " + schedulingInfo);
+    buffer.append("num-used-slots" + numUsedSlots);
+    buffer.append("num-reserved-slots" + numReservedSlots);
+    buffer.append("used-mem" + usedMem);
+    buffer.append("reserved-mem" + reservedMem);
+    buffer.append("needed-mem" + neededMem);
     return buffer.toString();
   }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java Wed Nov  2 05:34:31 2011
@@ -322,6 +322,9 @@ class JobSubmitter {
   JobStatus submitJobInternal(Job job, Cluster cluster) 
   throws ClassNotFoundException, InterruptedException, IOException {
 
+    //validate the jobs output specs 
+    checkSpecs(job);
+    
     Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, 
                                                      job.getConfiguration());
     //configure the command line options correctly on the submitting dfs
@@ -338,7 +341,9 @@ class JobSubmitter {
     Path submitJobDir = new Path(jobStagingArea, jobId.toString());
     JobStatus status = null;
     try {
-      conf.set("mapreduce.job.dir", submitJobDir.toString());
+      conf.set("hadoop.http.filter.initializers", 
+          "org.apache.hadoop.yarn.server.webproxy.amfilter.AmFilterInitializer");
+      conf.set(MRJobConfig.MAPREDUCE_JOB_DIR, submitJobDir.toString());
       LOG.debug("Configuring job " + jobId + " with " + submitJobDir 
           + " as the submit dir");
       // get delegation token for the dir
@@ -349,8 +354,6 @@ class JobSubmitter {
 
       copyAndConfigureFiles(job, submitJobDir);
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
-
-      checkSpecs(job);
       
       // Create the splits for the job
       LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRConfig.java Wed Nov  2 05:34:31 2011
@@ -67,6 +67,7 @@ public interface MRConfig {
   public static final String FRAMEWORK_NAME  = "mapreduce.framework.name";
   public static final String CLASSIC_FRAMEWORK_NAME  = "classic";
   public static final String YARN_FRAMEWORK_NAME  = "yarn";
+  public static final String LOCAL_FRAMEWORK_NAME = "local";
 
   public static final String TASK_LOCAL_OUTPUT_CLASS =
   "mapreduce.task.local.output.class";

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Wed Nov  2 05:34:31 2011
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.mapreduce;
 
+import org.apache.hadoop.util.PlatformName;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -90,12 +91,6 @@ public interface MRJobConfig {
 
   public static final String WORKING_DIR = "mapreduce.job.working.dir";
 
-  public static final String END_NOTIFICATION_URL = "mapreduce.job.end-notification.url";
-
-  public static final String END_NOTIFICATION_RETRIES = "mapreduce.job.end-notification.retry.attempts";
-
-  public static final String END_NOTIFICATION_RETRIE_INTERVAL = "mapreduce.job.end-notification.retry.interval";
-
   public static final String CLASSPATH_ARCHIVES = "mapreduce.job.classpath.archives";
 
   public static final String CLASSPATH_FILES = "mapreduce.job.classpath.files";
@@ -237,6 +232,8 @@ public interface MRJobConfig {
   public static final String REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
 
   public static final String REDUCE_ULIMIT = "mapreduce.reduce.ulimit";
+  
+  public static final String MAPREDUCE_JOB_DIR = "mapreduce.job.dir";
 
   public static final String REDUCE_MAX_ATTEMPTS = "mapreduce.reduce.maxattempts";
 
@@ -272,7 +269,12 @@ public interface MRJobConfig {
 
   public static final String JOB_ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
 
+  public static final String DEFAULT_JOB_ACL_VIEW_JOB = " ";
+
   public static final String JOB_ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
+
+  public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
+
   public static final String JOB_SUBMITHOST =
     "mapreduce.job.submithostname";
   public static final String JOB_SUBMITHOSTADDR =
@@ -323,9 +325,9 @@ public interface MRJobConfig {
   public static final String DEFAULT_MR_AM_COMMAND_OPTS = "-Xmx1536m";
 
   /** Root Logging level passed to the MR app master.*/
-  public static final String MR_AM_LOG_OPTS = 
-    MR_AM_PREFIX+"log-opts";
-  public static final String DEFAULT_MR_AM_LOG_OPTS = "INFO";
+  public static final String MR_AM_LOG_LEVEL = 
+    MR_AM_PREFIX+"log.level";
+  public static final String DEFAULT_MR_AM_LOG_LEVEL = "INFO";
 
   /**The number of splits when reporting progress in MR*/
   public static final String MR_AM_NUM_PROGRESS_SPLITS = 
@@ -384,11 +386,11 @@ public interface MRJobConfig {
     MR_AM_PREFIX
     + "job.task.estimator.exponential.smooth.lambda-ms";
 
-  public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMNOOTH_LAMBDA_MS = 
+  public static final long DEFAULT_MR_AM_TASK_ESTIMATOR_SMOOTH_LAMBDA_MS = 
   1000L * 60;
 
   /** true if the smoothing rate should be exponential.*/
-  public static final String MR_AM_TASK_EXTIMATOR_EXPONENTIAL_RATE_ENABLE =
+  public static final String MR_AM_TASK_ESTIMATOR_EXPONENTIAL_RATE_ENABLE =
     MR_AM_PREFIX + "job.task.estimator.exponential.smooth.rate";
 
   /** The number of threads used to handle task RPC calls.*/
@@ -402,6 +404,15 @@ public interface MRJobConfig {
   public static final int DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS = 2000;
 
   /**
+   * If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
+   * milliseconds before aborting. During this interval, AM will still try
+   * to contact the RM.
+   */
+  public static final String MR_AM_TO_RM_WAIT_INTERVAL_MS =
+    MR_AM_PREFIX + "scheduler.connection.wait.interval-ms";
+  public static final int DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS = 360000;
+
+  /**
    * Boolean. Create the base dirs in the JobHistoryEventHandler
    * Set to false for multi-user clusters.  This is an internal config that
    * is set by the MR framework and read by it too.
@@ -428,7 +439,7 @@ public interface MRJobConfig {
       "mapreduce.admin.user.env";
 
   public static final String DEFAULT_MAPRED_ADMIN_USER_ENV =
-      "LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib";
+      "LD_LIBRARY_PATH=$HADOOP_COMMON_HOME/lib/native/" + PlatformName.getPlatformName();
 
   public static final String WORKDIR = "work";
 
@@ -436,10 +447,13 @@ public interface MRJobConfig {
 
   public static final String HADOOP_WORK_DIR = "HADOOP_WORK_DIR";
 
+  // Environment variables used by Pipes. (TODO: these
+  // do not appear to be used by current pipes source code!)
   public static final String STDOUT_LOGFILE_ENV = "STDOUT_LOGFILE_ENV";
-
   public static final String STDERR_LOGFILE_ENV = "STDERR_LOGFILE_ENV";
 
+  public static final String APPLICATION_ATTEMPT_ID_ENV = "APPLICATION_ATTEMPT_ID_ENV";
+
   // This should be the directory where splits file gets localized on the node
   // running ApplicationMaster.
   public static final String JOB_SUBMIT_DIR = "jobSubmitDir";
@@ -475,4 +489,33 @@ public interface MRJobConfig {
 
   public static final String APPLICATION_ATTEMPT_ID =
       "mapreduce.job.application.attempt.id";
+
+  /**
+   * Job end notification.
+   */
+  public static final String MR_JOB_END_NOTIFICATION_URL =
+    "mapreduce.job.end-notification.url";
+
+  public static final String MR_JOB_END_RETRY_ATTEMPTS =
+    "mapreduce.job.end-notification.retry.attempts";
+
+  public static final String MR_JOB_END_RETRY_INTERVAL =
+    "mapreduce.job.end-notification.retry.interval";
+
+  public static final String MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS =
+    "mapreduce.job.end-notification.max.attempts";
+
+  public static final String MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL =
+    "mapreduce.job.end-notification.max.retry.interval";
+
+  /*
+   * MR AM Service Authorization
+   */
+  public static final String   
+  MR_AM_SECURITY_SERVICE_AUTHORIZATION_TASK_UMBILICAL =
+      "security.job.task.protocol.acl";
+  public static final String   
+  MR_AM_SECURITY_SERVICE_AUTHORIZATION_CLIENT =
+      "security.job.client.protocol.acl";
+
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java Wed Nov  2 05:34:31 2011
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.Count
 import org.apache.avro.Schema;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.JsonDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.specific.SpecificDatumReader;
 
@@ -146,8 +145,10 @@ public class EventReader implements Clos
       result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
     case CLEANUP_ATTEMPT_KILLED:
       result = new TaskAttemptUnsuccessfulCompletionEvent(); break;
+    case AM_STARTED:
+      result = new AMStartedEvent(); break;
     default:
-      throw new RuntimeException("unexpected event type!");
+      throw new RuntimeException("unexpected event type: " + wrapper.type);
     }
     result.setDatum(wrapper.event);
     return result;

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventWriter.java Wed Nov  2 05:34:31 2011
@@ -25,7 +25,6 @@ import org.apache.avro.Schema;
 import org.apache.avro.io.DatumWriter;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
-import org.apache.avro.io.JsonEncoder;
 import org.apache.avro.specific.SpecificDatumWriter;
 import org.apache.avro.util.Utf8;
 import org.apache.commons.logging.Log;
@@ -72,6 +71,7 @@ class EventWriter {
   void flush() throws IOException {
     encoder.flush();
     out.flush();
+    out.hflush();
   }
 
   void close() throws IOException {

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/HistoryEvent.java Wed Nov  2 05:34:31 2011
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Wed Nov  2 05:34:31 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapreduce.jobh
 
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -37,6 +39,8 @@ import org.apache.hadoop.mapreduce.TaskI
 import org.apache.hadoop.mapred.TaskStatus;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 
 /**
  * Default Parser for the JobHistory files. Typical usage is
@@ -174,6 +178,9 @@ public class JobHistoryParser {
     case CLEANUP_ATTEMPT_FINISHED:
       handleTaskAttemptFinishedEvent((TaskAttemptFinishedEvent) event);
       break;
+    case AM_STARTED:
+      handleAMStartedEvent((AMStartedEvent) event);
+      break;
     default:
       break;
     }
@@ -202,6 +209,7 @@ public class JobHistoryParser {
     attemptInfo.sortFinishTime = event.getSortFinishTime();
     attemptInfo.counters = event.getCounters();
     attemptInfo.hostname = event.getHostname();
+    attemptInfo.rackname = event.getRackName();
   }
 
   private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@@ -214,6 +222,7 @@ public class JobHistoryParser {
     attemptInfo.mapFinishTime = event.getMapFinishTime();
     attemptInfo.counters = event.getCounters();
     attemptInfo.hostname = event.getHostname();
+    attemptInfo.rackname = event.getRackname();
   }
 
   private void handleTaskAttemptFailedEvent(
@@ -240,6 +249,8 @@ public class JobHistoryParser {
     attemptInfo.httpPort = event.getHttpPort();
     attemptInfo.trackerName = event.getTrackerName();
     attemptInfo.taskType = event.getTaskType();
+    attemptInfo.shufflePort = event.getShufflePort();
+    attemptInfo.containerId = event.getContainerId();
     
     taskInfo.attemptsMap.put(attemptId, attemptInfo);
   }
@@ -304,6 +315,21 @@ public class JobHistoryParser {
     info.totalReduces = event.getTotalReduces();
     info.uberized = event.getUberized();
   }
+  
+  private void handleAMStartedEvent(AMStartedEvent event) {
+    AMInfo amInfo = new AMInfo();
+    amInfo.appAttemptId = event.getAppAttemptId();
+    amInfo.startTime = event.getStartTime();
+    amInfo.containerId = event.getContainerId();
+    amInfo.nodeManagerHost = event.getNodeManagerHost();
+    amInfo.nodeManagerPort = event.getNodeManagerPort();
+    amInfo.nodeManagerHttpPort = event.getNodeManagerHttpPort();
+    if (info.amInfos == null) {
+      info.amInfos = new LinkedList<AMInfo>();
+    }
+    info.amInfos.add(amInfo);
+    info.latestAmInfo = amInfo;
+  }
 
   private void handleJobInfoChangeEvent(JobInfoChangeEvent event) {
     info.submitTime = event.getSubmitTime();
@@ -347,6 +373,8 @@ public class JobHistoryParser {
     Map<JobACL, AccessControlList> jobACLs;
     
     Map<TaskID, TaskInfo> tasksMap;
+    List<AMInfo> amInfos;
+    AMInfo latestAmInfo;
     boolean uberized;
     
     /** Create a job info object where job information will be stored
@@ -376,7 +404,9 @@ public class JobHistoryParser {
       System.out.println("REDUCE_COUNTERS:" + reduceCounters.toString());
       System.out.println("TOTAL_COUNTERS: " + totalCounters.toString());
       System.out.println("UBERIZED: " + uberized);
-
+      for (AMInfo amInfo : amInfos) {
+        amInfo.printAll();
+      }
       for (TaskInfo ti: tasksMap.values()) {
         ti.printAll();
       }
@@ -426,6 +456,10 @@ public class JobHistoryParser {
     public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }
     /** @return the uberized status of this job */
     public boolean getUberized() { return uberized; }
+    /** @return the AMInfo for the job's AppMaster */
+    public List<AMInfo> getAMInfos() { return amInfos; }
+    /** @return the AMInfo for the newest AppMaster */
+    public AMInfo getLatestAMInfo() { return latestAmInfo; };
   }
   
   /**
@@ -506,7 +540,10 @@ public class JobHistoryParser {
     String trackerName;
     Counters counters;
     int httpPort;
+    int shufflePort;
     String hostname;
+    String rackname;
+    ContainerId containerId;
 
     /** Create a Task Attempt Info which will store attempt level information
      * on a history parse.
@@ -514,8 +551,9 @@ public class JobHistoryParser {
     public TaskAttemptInfo() {
       startTime = finishTime = shuffleFinishTime = sortFinishTime = 
         mapFinishTime = -1;
-      error =  state =  trackerName = hostname = "";
+      error =  state =  trackerName = hostname = rackname = "";
       httpPort = -1;
+      shufflePort = -1;
     }
     /**
      * Print all the information about this attempt.
@@ -530,6 +568,8 @@ public class JobHistoryParser {
       System.out.println("TASK_TYPE:" + taskType);
       System.out.println("TRACKER_NAME:" + trackerName);
       System.out.println("HTTP_PORT:" + httpPort);
+      System.out.println("SHUFFLE_PORT:" + shufflePort);
+      System.out.println("CONTIANER_ID:" + containerId);
       if (counters != null) {
         System.out.println("COUNTERS:" + counters.toString());
       }
@@ -559,9 +599,91 @@ public class JobHistoryParser {
     public String getTrackerName() { return trackerName; }
     /** @return the host name */
     public String getHostname() { return hostname; }
+    /** @return the rack name */
+    public String getRackname() { return rackname; }
     /** @return the counters for the attempt */
     public Counters getCounters() { return counters; }
     /** @return the HTTP port for the tracker */
     public int getHttpPort() { return httpPort; }
+    /** @return the Shuffle port for the tracker */
+    public int getShufflePort() { return shufflePort; }
+    /** @return the ContainerId for the tracker */
+    public ContainerId getContainerId() { return containerId; }
+  }
+
+  /**
+   * Stores AM information
+   */
+  public static class AMInfo {
+    ApplicationAttemptId appAttemptId;
+    long startTime;
+    ContainerId containerId;
+    String nodeManagerHost;
+    int nodeManagerPort;
+    int nodeManagerHttpPort;
+
+    /**
+     * Create a AM Info which will store AM level information on a history
+     * parse.
+     */
+    public AMInfo() {
+      startTime = -1;
+      nodeManagerHost = "";
+      nodeManagerHttpPort = -1;
+    }
+
+    public AMInfo(ApplicationAttemptId appAttemptId, long startTime,
+        ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
+        int nodeManagerHttpPort) {
+      this.appAttemptId = appAttemptId;
+      this.startTime = startTime;
+      this.containerId = containerId;
+      this.nodeManagerHost = nodeManagerHost;
+      this.nodeManagerPort = nodeManagerPort;
+      this.nodeManagerHttpPort = nodeManagerHttpPort;
+    }
+
+    /**
+     * Print all the information about this AM.
+     */
+    public void printAll() {
+      System.out.println("APPLICATION_ATTEMPT_ID:" + appAttemptId.toString());
+      System.out.println("START_TIME: " + startTime);
+      System.out.println("CONTAINER_ID: " + containerId.toString());
+      System.out.println("NODE_MANAGER_HOST: " + nodeManagerHost);
+      System.out.println("NODE_MANAGER_PORT: " + nodeManagerPort);
+      System.out.println("NODE_MANAGER_HTTP_PORT: " + nodeManagerHttpPort);
+    }
+
+    /** @return the ApplicationAttemptId */
+    public ApplicationAttemptId getAppAttemptId() {
+      return appAttemptId;
+    }
+
+    /** @return the start time of the AM */
+    public long getStartTime() {
+      return startTime;
+    }
+
+    /** @return the container id for the AM */
+    public ContainerId getContainerId() {
+      return containerId;
+    }
+
+    /** @return the host name for the node manager on which the AM is running */
+    public String getNodeManagerHost() {
+      return nodeManagerHost;
+    }
+
+    /** @return the port for the node manager running the AM */
+    public int getNodeManagerPort() {
+      return nodeManagerPort;
+    }
+
+    /** @return the http port for the node manager running the AM */
+    public int getNodeManagerHttpPort() {
+      return nodeManagerHttpPort;
+    }
   }
+
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobInitedEvent.java Wed Nov  2 05:34:31 2011
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobUnsuccessfulCompletionEvent.java Wed Nov  2 05:34:31 2011
@@ -18,14 +18,11 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.JobID;
 
-import org.apache.avro.util.Utf8;
-
 /**
  * Event to record Failed and Killed completion of jobs
  *

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java Wed Nov  2 05:34:31 2011
@@ -18,17 +18,14 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapred.ProgressSplitsBlock;
-
-import org.apache.avro.util.Utf8;
 
 /**
  * Event to record successful completion of a map attempt
@@ -47,6 +44,7 @@ public class MapAttemptFinishedEvent  im
    * @param mapFinishTime Finish time of the map phase
    * @param finishTime Finish time of the attempt
    * @param hostname Name of the host where the map executed
+   * @param rackName Name of the rack where the map executed
    * @param state State string for the attempt
    * @param counters Counters for the attempt
    * @param allSplits the "splits", or a pixelated graph of various
@@ -59,7 +57,7 @@ public class MapAttemptFinishedEvent  im
    */
   public MapAttemptFinishedEvent
       (TaskAttemptID id, TaskType taskType, String taskStatus, 
-       long mapFinishTime, long finishTime, String hostname,
+       long mapFinishTime, long finishTime, String hostname, String rackName,
        String state, Counters counters,
        int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
@@ -69,6 +67,7 @@ public class MapAttemptFinishedEvent  im
     datum.mapFinishTime = mapFinishTime;
     datum.finishTime = finishTime;
     datum.hostname = new Utf8(hostname);
+    datum.rackname = new Utf8(rackName);
     datum.state = new Utf8(state);
     datum.counters = EventWriter.toAvro(counters);
 
@@ -107,7 +106,8 @@ public class MapAttemptFinishedEvent  im
       (TaskAttemptID id, TaskType taskType, String taskStatus, 
        long mapFinishTime, long finishTime, String hostname,
        String state, Counters counters) {
-    this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, state, counters, null);
+    this(id, taskType, taskStatus, mapFinishTime, finishTime, hostname, "",
+        state, counters, null);
   }
   
   
@@ -136,6 +136,8 @@ public class MapAttemptFinishedEvent  im
   public long getFinishTime() { return datum.finishTime; }
   /** Get the host name */
   public String getHostname() { return datum.hostname.toString(); }
+  /** Get the rack name */
+  public String getRackname() { return datum.rackname.toString(); }
   /** Get the state string */
   public String getState() { return datum.state.toString(); }
   /** Get the counters */

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Wed Nov  2 05:34:31 2011
@@ -18,19 +18,15 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.ProgressSplitsBlock;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
-import org.apache.hadoop.mapred.ProgressSplitsBlock;
-
-import org.apache.avro.util.Utf8;
-
 /**
  * Event to record successful completion of a reduce attempt
  *
@@ -50,6 +46,7 @@ public class ReduceAttemptFinishedEvent 
    * @param sortFinishTime Finish time of the sort phase
    * @param finishTime Finish time of the attempt
    * @param hostname Name of the host where the attempt executed
+   * @param rackName Name of the rack where the attempt executed
    * @param state State of the attempt
    * @param counters Counters for the attempt
    * @param allSplits the "splits", or a pixelated graph of various
@@ -60,7 +57,7 @@ public class ReduceAttemptFinishedEvent 
   public ReduceAttemptFinishedEvent
     (TaskAttemptID id, TaskType taskType, String taskStatus, 
      long shuffleFinishTime, long sortFinishTime, long finishTime,
-     String hostname, String state, Counters counters,
+     String hostname, String rackName, String state, Counters counters,
      int[][] allSplits) {
     datum.taskid = new Utf8(id.getTaskID().toString());
     datum.attemptId = new Utf8(id.toString());
@@ -70,6 +67,7 @@ public class ReduceAttemptFinishedEvent 
     datum.sortFinishTime = sortFinishTime;
     datum.finishTime = finishTime;
     datum.hostname = new Utf8(hostname);
+    datum.rackname = new Utf8(rackName);
     datum.state = new Utf8(state);
     datum.counters = EventWriter.toAvro(counters);
 
@@ -110,7 +108,7 @@ public class ReduceAttemptFinishedEvent 
      String hostname, String state, Counters counters) {
     this(id, taskType, taskStatus,
          shuffleFinishTime, sortFinishTime, finishTime,
-         hostname, state, counters, null);
+         hostname, "", state, counters, null);
   }
 
   ReduceAttemptFinishedEvent() {}
@@ -140,6 +138,8 @@ public class ReduceAttemptFinishedEvent 
   public long getFinishTime() { return datum.finishTime; }
   /** Get the name of the host where the attempt ran */
   public String getHostname() { return datum.hostname.toString(); }
+  /** Get the rack name of the node where the attempt ran */
+  public String getRackName() { return datum.rackname.toString(); }
   /** Get the state string */
   public String getState() { return datum.state.toString(); }
   /** Get the counters for the attempt */

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptStartedEvent.java Wed Nov  2 05:34:31 2011
@@ -18,13 +18,13 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import org.apache.avro.util.Utf8;
 
@@ -44,16 +44,28 @@ public class TaskAttemptStartedEvent imp
    * @param startTime Start time of the attempt
    * @param trackerName Name of the Task Tracker where attempt is running
    * @param httpPort The port number of the tracker
+   * @param shufflePort The shuffle port number of the container
+   * @param containerId The containerId for the task attempt.
    */
   public TaskAttemptStartedEvent( TaskAttemptID attemptId,  
       TaskType taskType, long startTime, String trackerName,
-      int httpPort) {
+      int httpPort, int shufflePort, ContainerId containerId) {
     datum.attemptId = new Utf8(attemptId.toString());
     datum.taskid = new Utf8(attemptId.getTaskID().toString());
     datum.startTime = startTime;
     datum.taskType = new Utf8(taskType.name());
     datum.trackerName = new Utf8(trackerName);
     datum.httpPort = httpPort;
+    datum.shufflePort = shufflePort;
+    datum.containerId = new Utf8(containerId.toString());
+  }
+
+  // TODO Remove after MrV1 is removed.
+  // Using a dummy containerId to prevent jobHistory parse failures.
+  public TaskAttemptStartedEvent(TaskAttemptID attemptId, TaskType taskType,
+      long startTime, String trackerName, int httpPort, int shufflePort) {
+    this(attemptId, taskType, startTime, trackerName, httpPort, shufflePort,
+        ConverterUtils.toContainerId("container_-1_-1_-1_-1"));
   }
 
   TaskAttemptStartedEvent() {}
@@ -75,6 +87,8 @@ public class TaskAttemptStartedEvent imp
   }
   /** Get the HTTP port */
   public int getHttpPort() { return datum.httpPort; }
+  /** Get the shuffle port */
+  public int getShufflePort() { return datum.shufflePort; }
   /** Get the attempt id */
   public TaskAttemptID getTaskAttemptId() {
     return TaskAttemptID.forName(datum.attemptId.toString());
@@ -87,5 +101,8 @@ public class TaskAttemptStartedEvent imp
            ? EventType.MAP_ATTEMPT_STARTED 
            : EventType.REDUCE_ATTEMPT_STARTED;
   }
-
+  /** Get the ContainerId */
+  public ContainerId getContainerId() {
+    return ConverterUtils.toContainerId(datum.containerId.toString());
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskStartedEvent.java Wed Nov  2 05:34:31 2011
@@ -18,15 +18,12 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
+import org.apache.avro.util.Utf8;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
 
-import org.apache.avro.util.Utf8;
-
 /**
  * Event to record the start of a task
  *



Mime
View raw message