hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1407217 [1/2] - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/...
Date Thu, 08 Nov 2012 19:10:01 GMT
Author: suresh
Date: Thu Nov  8 19:09:46 2012
New Revision: 1407217

URL: http://svn.apache.org/viewvc?rev=1407217&view=rev
Log:
Merging trunk to branch-trunk-win branch

Added:
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
      - copied unchanged from r1407201, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestAMInfos.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
      - copied unchanged from r1407201, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestEventFetcher.java
Modified:
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/QueueAclsInfo.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/QueueInfo.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/TaskReport.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounterGroup.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/AbstractCounters.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/GenericCounter.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/EventReader.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/TaggedInputSplit.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/partition/KeyFieldBasedPartitioner.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Shuffle.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/CountersStrings.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (contents, props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsView.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestIFile.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestKeyFieldBasedPartitioner.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestUmbilicalProtocolWithJobToken.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/c++/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/block_forensics/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/build-contrib.xml   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/build.xml   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/data_join/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/eclipse-plugin/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/index/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/contrib/vaidya/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/examples/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/java/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/src/webapps/job/   (props changed)

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1401063-1407201

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Thu Nov  8 19:09:46 2012
@@ -189,6 +189,14 @@ Release 2.0.3-alpha - Unreleased 
     MAPREDUCE-4736. Remove obsolete option [-rootDir] from TestDFSIO.
     (Brandon Li via suresh)
 
+    MAPREDUCE-4637. Handle TaskAttempt diagnostic updates while in the NEW and 
+    UNASSIGNED states. (Mayank Bansal via sseth)
+
+    MAPREDUCE-1806. CombineFileInputFormat does not work with paths not on default FS. (Gera Shegalov via tucu)
+
+    MAPREDUCE-4777. In TestIFile, testIFileReaderWithCodec relies on
+    testIFileWriterWithCodec. (Sandy Ryza via tomwhite)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES
@@ -576,6 +584,9 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4596. Split StateMachine state from states seen by MRClientProtocol
     for Job, Task and TaskAttempt. (Siddarth Seth via vinodkv) 
 
+    MAPREDUCE-4752. Reduce MR AM memory usage through String Interning (Robert
+    Evans via tgraves)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -602,6 +613,34 @@ Release 0.23.5 - UNRELEASED
     MAPREDUCE-4740. only .jars can be added to the Distributed Cache
     classpath. (Robert Joseph Evans via jlowe)
 
+    MAPREDUCE-4229. Intern counter names in the JT (Miomir Boljanovic and bobby via daryn)
+
+    MAPREDUCE-4741. WARN and ERROR messages logged during normal AM shutdown.
+    (Vinod Kumar Vavilapalli via jlowe)
+
+    MAPREDUCE-4730. Fix Reducer's EventFetcher to scale the map-completion
+    requests slowly to avoid HADOOP-8942. (Jason Lowe via vinodkv)
+
+    MAPREDUCE-4748. Invalid event: T_ATTEMPT_SUCCEEDED at SUCCEEDED. (jlowe)
+
+    MAPREDUCE-4724. job history web ui applications page should be sorted to
+    display last app first (tgraves via bobby)
+
+    MAPREDUCE-4746. The MR Application Master does not have a config to set
+    environment variables (Rob Parker via bobby)
+
+    MAPREDUCE-4729. job history UI not showing all job attempts. (Vinod
+    Kumar Vavilapalli via jlowe)
+
+    MAPREDUCE-4763 repair test TestUmbilicalProtocolWithJobToken (Ivan A.
+    Veselovsky via bobby)
+
+    MAPREDUCE-4771. KeyFieldBasedPartitioner not partitioning properly when
+    configured (jlowe via bobby)
+
+    MAPREDUCE-4772. Fetch failures can take way too long for a map to be 
+    restarted (bobby)
+ 
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1401063-1407201

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1401063-1407201

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java Thu Nov  8 19:09:46 2012
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.security.authorize.MRAMPolicyProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.CompositeService;
 
@@ -280,6 +281,7 @@ public class TaskAttemptListenerImpl ext
   @Override
   public void reportDiagnosticInfo(TaskAttemptID taskAttemptID, String diagnosticInfo)
  throws IOException {
+    diagnosticInfo = StringInterner.weakIntern(diagnosticInfo);
     LOG.info("Diagnostics report from " + taskAttemptID.toString() + ": "
         + diagnosticInfo);
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Thu Nov  8 19:09:46 2012
@@ -23,14 +23,17 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -45,6 +48,9 @@ import org.apache.hadoop.mapreduce.Outpu
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.AMStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.EventReader;
+import org.apache.hadoop.mapreduce.jobhistory.EventType;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
@@ -89,6 +95,7 @@ import org.apache.hadoop.security.Creden
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.ClusterInfo;
 import org.apache.hadoop.yarn.SystemClock;
@@ -107,6 +114,8 @@ import org.apache.hadoop.yarn.service.Co
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The Map-Reduce Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
@@ -398,52 +407,65 @@ public class MRAppMaster extends Composi
   protected void sysexit() {
     System.exit(0);
   }
-  
-  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
-    @Override
-    public void handle(JobFinishEvent event) {
-      // job has finished
-      // this is the only job, so shut down the Appmaster
-      // note in a workflow scenario, this may lead to creation of a new
-      // job (FIXME?)
-      // Send job-end notification
-      if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
-        try {
-          LOG.info("Job end notification started for jobID : "
-              + job.getReport().getJobId());
-          JobEndNotifier notifier = new JobEndNotifier();
-          notifier.setConf(getConfig());
-          notifier.notify(job.getReport());
-        } catch (InterruptedException ie) {
-          LOG.warn("Job end notification interrupted for jobID : "
-              + job.getReport().getJobId(), ie);
-        }
-      }
 
-      // TODO:currently just wait for some time so clients can know the
-      // final states. Will be removed once RM come on.
+  @VisibleForTesting
+  public void shutDownJob() {
+    // job has finished
+    // this is the only job, so shut down the Appmaster
+    // note in a workflow scenario, this may lead to creation of a new
+    // job (FIXME?)
+    // Send job-end notification
+    if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
       try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
+        LOG.info("Job end notification started for jobID : "
+            + job.getReport().getJobId());
+        JobEndNotifier notifier = new JobEndNotifier();
+        notifier.setConf(getConfig());
+        notifier.notify(job.getReport());
+      } catch (InterruptedException ie) {
+        LOG.warn("Job end notification interrupted for jobID : "
+            + job.getReport().getJobId(), ie);
       }
+    }
 
-      try {
-        //We are finishing cleanly so this is the last retry
-        isLastAMRetry = true;
-        // Stop all services
-        // This will also send the final report to the ResourceManager
-        LOG.info("Calling stop for all the services");
-        stop();
+    // TODO:currently just wait for some time so clients can know the
+    // final states. Will be removed once RM come on.
+    try {
+      Thread.sleep(5000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
 
-      } catch (Throwable t) {
-        LOG.warn("Graceful stop failed ", t);
-      }
+    try {
+      //We are finishing cleanly so this is the last retry
+      isLastAMRetry = true;
+      // Stop all services
+      // This will also send the final report to the ResourceManager
+      LOG.info("Calling stop for all the services");
+      MRAppMaster.this.stop();
 
-      //Bring the process down by force.
-      //Not needed after HADOOP-7140
-      LOG.info("Exiting MR AppMaster..GoodBye!");
-      sysexit();
+    } catch (Throwable t) {
+      LOG.warn("Graceful stop failed ", t);
+    }
+
+    //Bring the process down by force.
+    //Not needed after HADOOP-7140
+    LOG.info("Exiting MR AppMaster..GoodBye!");
+    sysexit();   
+  }
+ 
+  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+    @Override
+    public void handle(JobFinishEvent event) {
+      // Create a new thread to shutdown the AM. We should not do it in-line
+      // to avoid blocking the dispatcher itself.
+      new Thread() {
+        
+        @Override
+        public void run() {
+          shutDownJob();
+        }
+      }.start();
     }
   }
   
@@ -811,16 +833,21 @@ public class MRAppMaster extends Composi
   @Override
   public void start() {
 
+    amInfos = new LinkedList<AMInfo>();
+
     // Pull completedTasks etc from recovery
     if (inRecovery) {
       completedTasksFromPreviousRun = recoveryServ.getCompletedTasks();
       amInfos = recoveryServ.getAMInfos();
+    } else {
+      // Get the amInfos anyways irrespective of whether recovery is enabled or
+      // not IF this is not the first AM generation
+      if (appAttemptID.getAttemptId() != 1) {
+        amInfos.addAll(readJustAMInfos());
+      }
     }
 
-    // / Create the AMInfo for the current AppMaster
-    if (amInfos == null) {
-      amInfos = new LinkedList<AMInfo>();
-    }
+    // Current an AMInfo for the current AM generation.
     AMInfo amInfo =
         MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
             nmPort, nmHttpPort);
@@ -878,6 +905,51 @@ public class MRAppMaster extends Composi
     startJobs();
   }
 
+  private List<AMInfo> readJustAMInfos() {
+    List<AMInfo> amInfos = new ArrayList<AMInfo>();
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream =
+          RecoveryService.getPreviousJobHistoryFileStream(getConfig(),
+            appAttemptID);
+      EventReader jobHistoryEventReader = new EventReader(inputStream);
+
+      // All AMInfos are contiguous. Track when the first AMStartedEvent
+      // appears.
+      boolean amStartedEventsBegan = false;
+
+      HistoryEvent event;
+      while ((event = jobHistoryEventReader.getNextEvent()) != null) {
+        if (event.getEventType() == EventType.AM_STARTED) {
+          if (!amStartedEventsBegan) {
+            // First AMStartedEvent.
+            amStartedEventsBegan = true;
+          }
+          AMStartedEvent amStartedEvent = (AMStartedEvent) event;
+          amInfos.add(MRBuilderUtils.newAMInfo(
+            amStartedEvent.getAppAttemptId(), amStartedEvent.getStartTime(),
+            amStartedEvent.getContainerId(),
+            StringInterner.weakIntern(amStartedEvent.getNodeManagerHost()),
+            amStartedEvent.getNodeManagerPort(),
+            amStartedEvent.getNodeManagerHttpPort()));
+        } else if (amStartedEventsBegan) {
+          // This means AMStartedEvents began and this event is a
+          // non-AMStarted event.
+          // No need to continue reading all the other events.
+          break;
+        }
+      }
+    } catch (IOException e) {
+      LOG.warn("Could not parse the old history file. "
+          + "Will not have old AMinfos ", e);
+    } finally {
+      if (inputStream != null) {
+        IOUtils.closeQuietly(inputStream);
+      }
+    }
+    return amInfos;
+  }
+
   /**
    * This can be overridden to instantiate multiple jobs and create a 
    * workflow.

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Nov  8 19:09:46 2012
@@ -68,6 +68,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -1409,16 +1410,22 @@ public class JobImpl implements org.apac
         fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
         job.fetchFailuresMapping.put(mapId, fetchFailures);
         
-        //get number of running reduces
-        int runningReduceTasks = 0;
+        //get number of shuffling reduces
+        int shufflingReduceTasks = 0;
         for (TaskId taskId : job.reduceTasks) {
-          if (TaskState.RUNNING.equals(job.tasks.get(taskId).getState())) {
-            runningReduceTasks++;
+          Task task = job.tasks.get(taskId);
+          if (TaskState.RUNNING.equals(task.getState())) {
+            for(TaskAttempt attempt : task.getAttempts().values()) {
+              if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
+                shufflingReduceTasks++;
+                break;
+              }
+            }
           }
         }
         
-        float failureRate = runningReduceTasks == 0 ? 1.0f : 
-          (float) fetchFailures / runningReduceTasks;
+        float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
+          (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures
         boolean isMapFaulty =
             (failureRate >= MAX_ALLOWED_FETCH_FAILURES_FRACTION);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Nov  8 19:09:46 2012
@@ -105,6 +105,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
@@ -200,6 +201,10 @@ public abstract class TaskAttemptImpl im
          TaskAttemptEventType.TA_KILL, new KilledTransition())
      .addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
+     .addTransition(TaskAttemptStateInternal.NEW,
+          TaskAttemptStateInternal.NEW,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
 
      // Transitions from the UNASSIGNED state.
      .addTransition(TaskAttemptStateInternal.UNASSIGNED,
@@ -211,6 +216,10 @@ public abstract class TaskAttemptImpl im
      .addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
          TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
              TaskAttemptStateInternal.FAILED, true))
+     .addTransition(TaskAttemptStateInternal.UNASSIGNED,
+          TaskAttemptStateInternal.UNASSIGNED,
+          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
 
      // Transitions from the ASSIGNED state.
      .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
@@ -932,7 +941,6 @@ public abstract class TaskAttemptImpl im
       Counters counters = reportedStatus.counters;
       if (counters == null) {
         counters = EMPTY_COUNTERS;
-//        counters.groups = new HashMap<String, CounterGroup>();
       }
       return counters;
     } finally {
@@ -1254,9 +1262,10 @@ public abstract class TaskAttemptImpl im
         (TaskAttemptContainerAssignedEvent) event;
       taskAttempt.containerID = cEvent.getContainer().getId();
       taskAttempt.containerNodeId = cEvent.getContainer().getNodeId();
-      taskAttempt.containerMgrAddress = taskAttempt.containerNodeId
-          .toString();
-      taskAttempt.nodeHttpAddress = cEvent.getContainer().getNodeHttpAddress();
+      taskAttempt.containerMgrAddress = StringInterner.weakIntern(
+          taskAttempt.containerNodeId.toString());
+      taskAttempt.nodeHttpAddress = StringInterner.weakIntern(
+          cEvent.getContainer().getNodeHttpAddress());
       taskAttempt.nodeRackName = RackResolver.resolve(
           taskAttempt.containerNodeId.getHost()).getNetworkLocation();
       taskAttempt.containerToken = cEvent.getContainer().getContainerToken();
@@ -1702,7 +1711,6 @@ public abstract class TaskAttemptImpl im
     result.stateString = "NEW";
     result.taskState = TaskAttemptState.NEW;
     Counters counters = EMPTY_COUNTERS;
-    //    counters.groups = new HashMap<String, CounterGroup>();
     result.counters = counters;
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Nov  8 19:09:46 2012
@@ -75,6 +75,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -203,7 +204,10 @@ public abstract class TaskImpl implement
     .addTransition(
         TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
         EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
-            TaskEventType.T_ATTEMPT_LAUNCHED))
+            TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+            TaskEventType.T_ATTEMPT_LAUNCHED,
+            TaskEventType.T_ATTEMPT_SUCCEEDED,
+            TaskEventType.T_KILL))
 
     // Transitions from FAILED state        
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
@@ -664,9 +668,9 @@ public abstract class TaskImpl implement
           .newRecordInstance(TaskAttemptCompletionEvent.class);
       tce.setEventId(-1);
       String scheme = (encryptedShuffle) ? "https://" : "http://";
-      tce.setMapOutputServerAddress(scheme
+      tce.setMapOutputServerAddress(StringInterner.weakIntern(scheme
          + attempt.getNodeHttpAddress().split(":")[0] + ":"
-         + attempt.getShufflePort());
+         + attempt.getShufflePort()));
       tce.setStatus(status);
       tce.setAttemptId(attempt.getID());
       int runTime = 0;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Thu Nov  8 19:09:46 2012
@@ -30,6 +30,7 @@ import java.util.concurrent.LinkedBlocki
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -81,6 +82,7 @@ public class ContainerLauncherImpl exten
   protected BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
   YarnRPC rpc;
+  private final AtomicBoolean stopped;
 
   private Container getContainer(ContainerLauncherEvent event) {
     ContainerId id = event.getContainerID();
@@ -237,6 +239,7 @@ public class ContainerLauncherImpl exten
   public ContainerLauncherImpl(AppContext context) {
     super(ContainerLauncherImpl.class.getName());
     this.context = context;
+    this.stopped = new AtomicBoolean(false);
   }
 
   @Override
@@ -271,11 +274,13 @@ public class ContainerLauncherImpl exten
       @Override
       public void run() {
         ContainerLauncherEvent event = null;
-        while (!Thread.currentThread().isInterrupted()) {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
           try {
             event = eventQueue.take();
           } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
+            if (!stopped.get()) {
+              LOG.error("Returning, interrupted : " + e);
+            }
             return;
           }
           int poolSize = launcherPool.getCorePoolSize();
@@ -324,6 +329,10 @@ public class ContainerLauncherImpl exten
   }
 
   public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
     // shutdown any containers that might be left running
     shutdownAllContainers();
     eventHandlingThread.interrupt();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Thu Nov  8 19:09:46 2012
@@ -34,7 +34,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -178,26 +177,13 @@ public class RecoveryService extends Com
   }
 
   private void parse() throws IOException {
-    // TODO: parse history file based on startCount
-    String jobName = 
-        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
-    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
-    FSDataInputStream in = null;
-    Path historyFile = null;
-    Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
-        new Path(jobhistoryDir));
-    FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
-        getConfig());
-    //read the previous history file
-    historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
-        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));  
-    LOG.info("History file is at " + historyFile);
-    in = fc.open(historyFile);
+    FSDataInputStream in =
+        getPreviousJobHistoryFileStream(getConfig(), applicationAttemptId);
     JobHistoryParser parser = new JobHistoryParser(in);
     jobInfo = parser.parse();
     Exception parseException = parser.getParseException();
     if (parseException != null) {
-      LOG.info("Got an error parsing job-history file " + historyFile + 
+      LOG.info("Got an error parsing job-history file" + 
           ", ignoring incomplete events.", parseException);
     }
     Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
@@ -213,6 +199,28 @@ public class RecoveryService extends Com
     LOG.info("Read completed tasks from history "
         + completedTasks.size());
   }
+
+  public static FSDataInputStream getPreviousJobHistoryFileStream(
+      Configuration conf, ApplicationAttemptId applicationAttemptId)
+      throws IOException {
+    FSDataInputStream in = null;
+    Path historyFile = null;
+    String jobName =
+        TypeConverter.fromYarn(applicationAttemptId.getApplicationId())
+          .toString();
+    String jobhistoryDir =
+        JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf);
+    Path histDirPath =
+        FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
+    FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
+    // read the previous history file
+    historyFile =
+        fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(histDirPath,
+          jobName, (applicationAttemptId.getAttemptId() - 1)));
+    LOG.info("History file is at " + historyFile);
+    in = fc.open(historyFile);
+    return in;
+  }
   
   protected Dispatcher createRecoveryDispatcher() {
     return new RecoveryDispatcher();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Thu Nov  8 19:09:46 2012
@@ -67,7 +67,7 @@ public abstract class RMCommunicator ext
   private int rmPollInterval;//millis
   protected ApplicationId applicationId;
   protected ApplicationAttemptId applicationAttemptId;
-  private AtomicBoolean stopped;
+  private final AtomicBoolean stopped;
   protected Thread allocatorThread;
   @SuppressWarnings("rawtypes")
   protected EventHandler eventHandler;
@@ -239,7 +239,9 @@ public abstract class RMCommunicator ext
               // TODO: for other exceptions
             }
           } catch (InterruptedException e) {
-            LOG.warn("Allocated thread interrupted. Returning.");
+            if (!stopped.get()) {
+              LOG.warn("Allocated thread interrupted. Returning.");
+            }
             return;
           }
         }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Nov  8 19:09:46 2012
@@ -32,6 +32,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -84,7 +86,7 @@ public class RMContainerAllocator extend
   private static final Priority PRIORITY_MAP;
 
   private Thread eventHandlingThread;
-  private volatile boolean stopEventHandling;
+  private final AtomicBoolean stopped;
 
   static {
     PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
@@ -145,6 +147,7 @@ public class RMContainerAllocator extend
 
   public RMContainerAllocator(ClientService clientService, AppContext context) {
     super(clientService, context);
+    this.stopped = new AtomicBoolean(false);
   }
 
   @Override
@@ -176,11 +179,13 @@ public class RMContainerAllocator extend
 
         ContainerAllocatorEvent event;
 
-        while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
           try {
             event = RMContainerAllocator.this.eventQueue.take();
           } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
+            if (!stopped.get()) {
+              LOG.error("Returning, interrupted : " + e);
+            }
             return;
           }
 
@@ -234,7 +239,10 @@ public class RMContainerAllocator extend
 
   @Override
   public void stop() {
-    this.stopEventHandling = true;
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
     eventHandlingThread.interrupt();
     super.stop();
     LOG.info("Final Stats: " + getStat());
@@ -613,7 +621,7 @@ public class RMContainerAllocator extend
         eventHandler.handle(new TaskAttemptEvent(attemptID,
             TaskAttemptEventType.TA_CONTAINER_COMPLETED));
         // Send the diagnostics
-        String diagnostics = cont.getDiagnostics();
+        String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
             diagnostics));
       }      

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/taskclean/TaskCleanerImpl.java Thu Nov  8 19:09:46 2012
@@ -23,6 +23,7 @@ import java.util.concurrent.LinkedBlocki
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -43,10 +44,12 @@ public class TaskCleanerImpl extends Abs
   private Thread eventHandlingThread;
   private BlockingQueue<TaskCleanupEvent> eventQueue =
       new LinkedBlockingQueue<TaskCleanupEvent>();
+  private final AtomicBoolean stopped;
 
   public TaskCleanerImpl(AppContext context) {
     super("TaskCleaner");
     this.context = context;
+    this.stopped = new AtomicBoolean(false);
   }
 
   public void start() {
@@ -59,11 +62,13 @@ public class TaskCleanerImpl extends Abs
       @Override
       public void run() {
         TaskCleanupEvent event = null;
-        while (!Thread.currentThread().isInterrupted()) {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
           try {
             event = eventQueue.take();
           } catch (InterruptedException e) {
-            LOG.error("Returning, interrupted : " + e);
+            if (!stopped.get()) {
+              LOG.error("Returning, interrupted : " + e);
+            }
             return;
           }
           // the events from the queue are handled in parallel
@@ -77,6 +82,10 @@ public class TaskCleanerImpl extends Abs
   }
 
   public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
     eventHandlingThread.interrupt();
     launcherPool.shutdown();
     super.stop();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java Thu Nov  8 19:09:46 2012
@@ -18,14 +18,19 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -37,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Assert;
 import org.junit.Test;
@@ -254,6 +260,169 @@ public class TestFetchFailure {
     events = job.getTaskAttemptCompletionEvents(0, 100);
     Assert.assertEquals("Num completion events not correct", 2, events.length);
   }
+  
+  @Test
+  public void testFetchFailureMultipleReduces() throws Exception {
+    MRApp app = new MRApp(1, 3, false, this.getClass().getName(), true);
+    Configuration conf = new Configuration();
+    // map -> reduce -> fetch-failure -> map retry is incompatible with
+    // sequential, single-task-attempt approach in uber-AM, so disable:
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("Num tasks not correct",
+       4, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    Task reduceTask2 = it.next();
+    Task reduceTask3 = it.next();
+    
+    //wait for Task state move to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+    
+    TaskAttemptCompletionEvent[] events = 
+      job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Num completion events not correct",
+        1, events.length);
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
+    
+    // wait for reduce to start running
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    app.waitForState(reduceTask2, TaskState.RUNNING);
+    app.waitForState(reduceTask3, TaskState.RUNNING);
+    TaskAttempt reduceAttempt = 
+      reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+    
+    updateStatus(app, reduceAttempt, Phase.SHUFFLE);
+    
+    TaskAttempt reduceAttempt2 = 
+      reduceTask2.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt2, TaskAttemptState.RUNNING);
+    updateStatus(app, reduceAttempt2, Phase.SHUFFLE);
+    
+    TaskAttempt reduceAttempt3 = 
+      reduceTask3.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt3, TaskAttemptState.RUNNING);
+    updateStatus(app, reduceAttempt3, Phase.SHUFFLE);
+    
+    //send 3 fetch failures from reduce to trigger map re execution
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    
+    //We should not re-launch the map task yet
+    assertEquals(TaskState.SUCCEEDED, mapTask.getState());
+    updateStatus(app, reduceAttempt2, Phase.REDUCE);
+    updateStatus(app, reduceAttempt3, Phase.REDUCE);
+    
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    
+    //wait for map Task state move back to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    
+    //map attempt must have become FAILED
+    Assert.assertEquals("Map TaskAttempt state not correct",
+        TaskAttemptState.FAILED, mapAttempt1.getState());
+
+    Assert.assertEquals("Num attempts in Map Task not correct",
+        2, mapTask.getAttempts().size());
+    
+    Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
+    atIt.next();
+    TaskAttempt mapAttempt2 = atIt.next();
+    
+    app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
+   //send the done signal to the second map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt2.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+    
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt2.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt3.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    //previous completion event now becomes obsolete
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+    
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Num completion events not correct",
+        6, events.length);
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt1.getID(), events[0].getAttemptId());
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt1.getID(), events[1].getAttemptId());
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt2.getID(), events[2].getAttemptId());
+    Assert.assertEquals("Event reduce attempt id not correct",
+        reduceAttempt.getID(), events[3].getAttemptId());
+    Assert.assertEquals("Event status not correct for map attempt1",
+        TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+    Assert.assertEquals("Event status not correct for map attempt1",
+        TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus());
+    Assert.assertEquals("Event status not correct for map attempt2",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
+    Assert.assertEquals("Event status not correct for reduce attempt1",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
+
+    TaskAttemptCompletionEvent mapEvents[] =
+        job.getMapAttemptCompletionEvents(0, 2);
+    Assert.assertEquals("Incorrect number of map events", 2, mapEvents.length);
+    Assert.assertArrayEquals("Unexpected map events",
+        Arrays.copyOfRange(events, 0, 2), mapEvents);
+    mapEvents = job.getMapAttemptCompletionEvents(2, 200);
+    Assert.assertEquals("Incorrect number of map events", 1, mapEvents.length);
+    Assert.assertEquals("Unexpected map event", events[2], mapEvents[0]);
+  }
+  
+
+  private void updateStatus(MRApp app, TaskAttempt attempt, Phase phase) {
+    TaskAttemptStatusUpdateEvent.TaskAttemptStatus status = new TaskAttemptStatusUpdateEvent.TaskAttemptStatus();
+    status.counters = new Counters();
+    status.fetchFailedMaps = new ArrayList<TaskAttemptId>();
+    status.id = attempt.getID();
+    status.mapFinishTime = 0;
+    status.outputSize = 0;
+    status.phase = phase;
+    status.progress = 0.5f;
+    status.shuffleFinishTime = 0;
+    status.sortFinishTime = 0;
+    status.stateString = "OK";
+    status.taskState = attempt.getState();
+    TaskAttemptStatusUpdateEvent event = new TaskAttemptStatusUpdateEvent(attempt.getID(),
+        status);
+    app.getContext().getEventHandler().handle(event);
+  }
 
   private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, 
       TaskAttempt mapAttempt) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Thu Nov  8 19:09:46 2012
@@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.a
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnExcept
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -68,7 +65,6 @@ import org.junit.Test;
    private Path stagingJobPath = new Path(stagingJobDir);
    private final static RecordFactory recordFactory = RecordFactoryProvider.
        getRecordFactory(null);
-   private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
    
    @Test
    public void testDeletionofStaging() throws IOException {
@@ -86,9 +82,7 @@ import org.junit.Test;
      jobid.setAppId(appId);
      MRAppMaster appMaster = new TestMRApp(attemptId);
      appMaster.init(conf);
-     EventHandler<JobFinishEvent> handler = 
-         appMaster.createJobFinishEventHandler();
-     handler.handle(new JobFinishEvent(jobid));
+     appMaster.shutDownJob();
      verify(fs).delete(stagingJobPath, true);
    }
    

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java Thu Nov  8 19:09:46 2012
@@ -546,6 +546,105 @@ public class TestTaskAttempt{
         eventHandler.internalError);
   }
 
+  @Test
+  public void testAppDiognosticEventOnUnassignedTask() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        mock(Token.class), new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+    taImpl.handle(new TaskAttemptEvent(attemptId,
+        TaskAttemptEventType.TA_SCHEDULE));
+    taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+        "Task got killed"));
+    assertFalse(
+        "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
+        eventHandler.internalError);
+  }
+
+  @Test
+  public void testAppDiognosticEventOnNewTask() throws Exception {
+    ApplicationId appId = BuilderUtils.newApplicationId(1, 2);
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        appId, 0);
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP);
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
+    Path jobFile = mock(Path.class);
+
+    MockEventHandler eventHandler = new MockEventHandler();
+    TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+    when(taListener.getAddress()).thenReturn(
+        new InetSocketAddress("localhost", 0));
+
+    JobConf jobConf = new JobConf();
+    jobConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+    jobConf.setBoolean("fs.file.impl.disable.cache", true);
+    jobConf.set(JobConf.MAPRED_MAP_TASK_ENV, "");
+    jobConf.set(MRJobConfig.APPLICATION_ATTEMPT_ID, "10");
+
+    TaskSplitMetaInfo splits = mock(TaskSplitMetaInfo.class);
+    when(splits.getLocations()).thenReturn(new String[] { "127.0.0.1" });
+
+    AppContext appCtx = mock(AppContext.class);
+    ClusterInfo clusterInfo = mock(ClusterInfo.class);
+    Resource resource = mock(Resource.class);
+    when(appCtx.getClusterInfo()).thenReturn(clusterInfo);
+    when(clusterInfo.getMinContainerCapability()).thenReturn(resource);
+    when(resource.getMemory()).thenReturn(1024);
+
+    TaskAttemptImpl taImpl = new MapTaskAttemptImpl(taskId, 1, eventHandler,
+        jobFile, 1, splits, jobConf, taListener, mock(OutputCommitter.class),
+        mock(Token.class), new Credentials(), new SystemClock(), appCtx);
+
+    NodeId nid = BuilderUtils.newNodeId("127.0.0.1", 0);
+    ContainerId contId = BuilderUtils.newContainerId(appAttemptId, 3);
+    Container container = mock(Container.class);
+    when(container.getId()).thenReturn(contId);
+    when(container.getNodeId()).thenReturn(nid);
+    when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+    taImpl.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptId,
+        "Task got killed"));
+    assertFalse(
+        "InternalError occurred trying to handle TA_DIAGNOSTICS_UPDATE on assigned task",
+        eventHandler.internalError);
+  }
+    
+  
   public static class MockEventHandler implements EventHandler {
     public boolean internalError;
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Thu Nov  8 19:09:46 2012
@@ -418,6 +418,21 @@ public class TestTaskImpl {
     killRunningTaskAttempt(getLastAttempt().getAttemptId());    
   }
 
+  @Test
+  public void testKillSuccessfulTask() {
+    LOG.info("--- START: testKillSuccesfulTask ---");
+    mockTask = createMockTask(TaskType.MAP);
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    commitTaskAttempt(getLastAttempt().getAttemptId());
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertTaskSucceededState();
+    mockTask.handle(new TaskEvent(taskId, TaskEventType.T_KILL));
+    assertTaskSucceededState();
+  }
+
   @Test 
   public void testTaskProgress() {
     LOG.info("--- START: testTaskProgress ---");
@@ -485,7 +500,8 @@ public class TestTaskImpl {
     assertTaskSucceededState();
   }
   
-  private void runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType failEvent) {
+  private void runSpeculativeTaskAttemptSucceeds(
+      TaskEventType firstAttemptFinishEvent) {
     TaskId taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(getLastAttempt().getAttemptId());
@@ -502,9 +518,9 @@ public class TestTaskImpl {
     // The task should now have succeeded
     assertTaskSucceededState();
     
-    // Now fail the first task attempt, after the second has succeeded
+    // Now complete the first task attempt, after the second has succeeded
     mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), 
-        failEvent));
+        firstAttemptFinishEvent));
     
     // The task should still be in the succeeded state
     assertTaskSucceededState();
@@ -513,25 +529,36 @@ public class TestTaskImpl {
   @Test
   public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
     mockTask = createMockTask(TaskType.MAP);        
-    runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
   }
 
   @Test
   public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstFails() {
     mockTask = createMockTask(TaskType.REDUCE);        
-    runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_FAILED);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_FAILED);
   }
   
   @Test
   public void testMapSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
     mockTask = createMockTask(TaskType.MAP);        
-    runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
   }
 
   @Test
   public void testReduceSpeculativeTaskAttemptSucceedsEvenIfFirstIsKilled() {
     mockTask = createMockTask(TaskType.REDUCE);        
-    runSpeculativeTaskAttemptSucceedsEvenIfFirstFails(TaskEventType.T_ATTEMPT_KILLED);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_KILLED);
   }
 
+  @Test
+  public void testMultipleTaskAttemptsSucceed() {
+    mockTask = createMockTask(TaskType.MAP);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_SUCCEEDED);
+  }
+
+  @Test
+  public void testCommitAfterSucceeds() {
+    mockTask = createMockTask(TaskType.REDUCE);
+    runSpeculativeTaskAttemptSucceeds(TaskEventType.T_ATTEMPT_COMMIT_PENDING);
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ClusterStatus.java Thu Nov  8 19:09:46 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.WritableUtil
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.TaskTrackerInfo;
 import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
+import org.apache.hadoop.util.StringInterner;
 
 /**
  * Status information on the current state of the Map-Reduce cluster.
@@ -141,9 +142,9 @@ public class ClusterStatus implements Wr
 
     @Override
     public void readFields(DataInput in) throws IOException {
-      trackerName = Text.readString(in);
-      reasonForBlackListing = Text.readString(in);
-      blackListReport = Text.readString(in);
+      trackerName = StringInterner.weakIntern(Text.readString(in));
+      reasonForBlackListing = StringInterner.weakIntern(Text.readString(in));
+      blackListReport = StringInterner.weakIntern(Text.readString(in));
     }
 
     @Override
@@ -429,7 +430,7 @@ public class ClusterStatus implements Wr
     int numTrackerNames = in.readInt();
     if (numTrackerNames > 0) {
       for (int i = 0; i < numTrackerNames; i++) {
-        String name = Text.readString(in);
+        String name = StringInterner.weakIntern(Text.readString(in));
         activeTrackers.add(name);
       }
     }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobProfile.java Thu Nov  8 19:09:46 2012
@@ -28,6 +28,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
 import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.util.StringInterner;
 
 /**************************************************
  * A JobProfile is a MapReduce primitive.  Tracks a job,
@@ -176,11 +177,11 @@ public class JobProfile implements Writa
 
   public void readFields(DataInput in) throws IOException {
     jobid.readFields(in);
-    this.jobFile = Text.readString(in);
-    this.url = Text.readString(in);
-    this.user = Text.readString(in);
-    this.name = Text.readString(in);
-    this.queueName = Text.readString(in);
+    this.jobFile = StringInterner.weakIntern(Text.readString(in));
+    this.url = StringInterner.weakIntern(Text.readString(in));
+    this.user = StringInterner.weakIntern(Text.readString(in));
+    this.name = StringInterner.weakIntern(Text.readString(in));
+    this.queueName = StringInterner.weakIntern(Text.readString(in));
   }
 }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/MapTask.java Thu Nov  8 19:09:46 2012
@@ -67,6 +67,7 @@ import org.apache.hadoop.util.IndexedSor
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
 
 /** A Map task. */
@@ -343,7 +344,7 @@ class MapTask extends Task {
    FileSystem fs = file.getFileSystem(conf);
    FSDataInputStream inFile = fs.open(file);
    inFile.seek(offset);
-   String className = Text.readString(inFile);
+   String className = StringInterner.weakIntern(Text.readString(inFile));
    Class<T> cls;
    try {
      cls = (Class<T>) conf.getClassByName(className);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java Thu Nov  8 19:09:46 2012
@@ -67,6 +67,7 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -467,7 +468,7 @@ abstract public class Task implements Wr
   }
   
   public void readFields(DataInput in) throws IOException {
-    jobFile = Text.readString(in);
+    jobFile = StringInterner.weakIntern(Text.readString(in));
     taskId = TaskAttemptID.read(in);
     partition = in.readInt();
     numSlotsRequired = in.readInt();
@@ -487,7 +488,7 @@ abstract public class Task implements Wr
     if (taskCleanup) {
       setPhase(TaskStatus.Phase.CLEANUP);
     }
-    user = Text.readString(in);
+    user = StringInterner.weakIntern(Text.readString(in));
     extraData.readFields(in);
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskStatus.java Thu Nov  8 19:09:46 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.util.StringUtils;
 /**************************************************
  * Describes the current status of a task.  This is
@@ -477,8 +478,8 @@ public abstract class TaskStatus impleme
     setProgress(in.readFloat());
     this.numSlots = in.readInt();
     this.runState = WritableUtils.readEnum(in, State.class);
-    setDiagnosticInfo(Text.readString(in));
-    setStateString(Text.readString(in));
+    setDiagnosticInfo(StringInterner.weakIntern(Text.readString(in)));
+    setStateString(StringInterner.weakIntern(Text.readString(in)));
     this.phase = WritableUtils.readEnum(in, Phase.class); 
     this.startTime = in.readLong(); 
     this.finishTime = in.readLong(); 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/TaggedInputSplit.java Thu Nov  8 19:09:46 2012
@@ -29,6 +29,7 @@ import org.apache.hadoop.mapred.InputFor
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringInterner;
 
 /**
  * An {@link InputSplit} that tags another InputSplit with extra data for use
@@ -114,7 +115,7 @@ class TaggedInputSplit implements Config
   }
 
   private Class<?> readClass(DataInput in) throws IOException {
-    String className = Text.readString(in);
+    String className = StringInterner.weakIntern(Text.readString(in));
     try {
       return conf.getClassByName(className);
     } catch (ClassNotFoundException e) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobStatus.java Thu Nov  8 19:09:46 2012
@@ -32,6 +32,7 @@ import org.apache.hadoop.io.WritableFact
 import org.apache.hadoop.io.WritableFactory;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.util.StringInterner;
 
 /**************************************************
  * Describes the current status of a job.
@@ -456,15 +457,15 @@ public class JobStatus implements Writab
     this.cleanupProgress = in.readFloat();
     this.runState = WritableUtils.readEnum(in, State.class);
     this.startTime = in.readLong();
-    this.user = Text.readString(in);
+    this.user = StringInterner.weakIntern(Text.readString(in));
     this.priority = WritableUtils.readEnum(in, JobPriority.class);
-    this.schedulingInfo = Text.readString(in);
+    this.schedulingInfo = StringInterner.weakIntern(Text.readString(in));
     this.finishTime = in.readLong();
     this.isRetired = in.readBoolean();
-    this.historyFile = Text.readString(in);
-    this.jobName = Text.readString(in);
-    this.trackingUrl = Text.readString(in);
-    this.jobFile = Text.readString(in);
+    this.historyFile = StringInterner.weakIntern(Text.readString(in));
+    this.jobName = StringInterner.weakIntern(Text.readString(in));
+    this.trackingUrl = StringInterner.weakIntern(Text.readString(in));
+    this.jobFile = StringInterner.weakIntern(Text.readString(in));
     this.isUber = in.readBoolean();
 
     // De-serialize the job's ACLs

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1407217&r1=1407216&r2=1407217&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Nov  8 19:09:46 2012
@@ -262,6 +262,9 @@ public interface MRJobConfig {
   public static final String SHUFFLE_FETCH_FAILURES = "mapreduce.reduce.shuffle.maxfetchfailures";
 
   public static final String SHUFFLE_NOTIFY_READERROR = "mapreduce.reduce.shuffle.notify.readerror";
+  
+  public static final String MAX_SHUFFLE_FETCH_RETRY_DELAY = "mapreduce.reduce.shuffle.retry-delay.max.ms";
+  public static final long DEFAULT_MAX_SHUFFLE_FETCH_RETRY_DELAY = 60000;
 
   public static final String REDUCE_SKIP_INCR_PROC_COUNT = "mapreduce.reduce.skip.proc-count.auto-incr";
 
@@ -486,6 +489,9 @@ public interface MRJobConfig {
   public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
       50;
   
+  public static final String MR_AM_ENV =
+      MR_AM_PREFIX + "env";
+  
   public static final String MAPRED_MAP_ADMIN_JAVA_OPTS =
       "mapreduce.admin.map.child.java.opts";
 



Mime
View raw message