hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1354832 [1/3] - in /hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/ hadoop-mapreduce-client/hadoop-mapreduce-clie...
Date Thu, 28 Jun 2012 07:00:39 GMT
Author: szetszwo
Date: Thu Jun 28 06:59:38 2012
New Revision: 1354832

URL: http://svn.apache.org/viewvc?rev=1354832&view=rev
Log:
Merge r1346682 through r1354801 from trunk.

Added:
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobUpdatedNodesEvent.java
      - copied unchanged from r1354801, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobUpdatedNodesEvent.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java
      - copied unchanged from r1354801, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/
      - copied from r1354801, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/
      - copied from r1354801, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
      - copied unchanged from r1354801, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestShuffleScheduler.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
      - copied unchanged from r1354801, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/CMakeLists.txt
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/config.h.cmake
      - copied unchanged from r1354801, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/config.h.cmake
Removed:
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/.autom4te.cfg
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/.deps/container-executor.Po
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/Makefile.am
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/configure.ac
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/c++/pipes/
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/c++/utils/
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/raid/bin/
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/raid/conf/
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/raid/src/java/org/
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/raid/src/test/org/
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/examples/pipes/
Modified:
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/TypeConverter.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapred/TestMRWithDistributedCache.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/TestTypeConverter.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Job.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmissionFiles.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/ShuffleScheduler.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestResourceMgrDelegate.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeState.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ApplicationPage.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/ContainerPage.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NodePage.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/native/container-executor/impl/main.c
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppPage.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/CapacityScheduler.apt.vm
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/HDFSHighAvailability.apt.vm
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/c++/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/block_forensics/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/build-contrib.xml   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/build.xml   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/data_join/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/eclipse-plugin/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/index/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/contrib/vaidya/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/examples/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/java/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/test/mapred/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/fs/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/hdfs/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/test/mapred/org/apache/hadoop/ipc/   (props changed)
    hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/src/webapps/job/   (props changed)

Propchange: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1346682-1354801

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/CHANGES.txt?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/CHANGES.txt Thu Jun 28 06:59:38 2012
@@ -60,6 +60,9 @@ Trunk (unreleased changes)
 
   BUG FIXES
 
+    MAPREDUCE-4356. [Rumen] Provide access to the method
+                    ParsedTask.obtainTaskAttempts(). (ravigummadi)
+
     MAPREDUCE-4100. [Gridmix] Bug fixed in compression emulation feature for 
                     map only jobs. (amarrk)
 
@@ -118,7 +121,9 @@ Trunk (unreleased changes)
     MAPREDUCE-3990. MRBench allows Long-sized input-lines value
     but parses CLI argument as an Integer. (harsh)
 
-Release 2.0.1-alpha - UNRELEASED
+    MAPREDUCE-3868. Make Raid Compile. (Weiyan Wang via schen)
+
+Branch-2 ( Unreleased changes )
 
   INCOMPATIBLE CHANGES
 
@@ -129,6 +134,12 @@ Release 2.0.1-alpha - UNRELEASED
     MAPREDUCE-4146. Support limits on task status string length and number of
     block locations in branch-2. (Ahmed Radwan via tomwhite)
 
+    MAPREDUCE-3871. Allow symlinking in LocalJobRunner DistributedCache.
+    (tomwhite)
+
+    MAPREDUCE-3921. MR AM should act on node health status changes. 
+    (Bikas Saha via sseth)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -166,6 +177,23 @@ Release 2.0.1-alpha - UNRELEASED
     MAPREDUCE-3873. Fixed NodeManagers' decommissioning at RM to accept IP
     addresses also. (xieguiming via vinodkv)
 
+    MAPREDUCE-4306. Fix distributed shell to work with users other than the one
+    running the daemons. (Ahmed Radwan via sseth)
+
+    MAPREDUCE-4031. Prevent a Node Manager hang during shutdown. 
+    (Devaraj K via sseth)
+
+    MAPREDUCE-4336. Distributed Shell fails when used with the CapacityScheduler
+    (ahmed via tucu)
+
+    MAPREDUCE-4290. Fix Yarn Applicaiton Status to MR JobState conversion. 
+    (Devaraj K via sseth)
+
+    MAPREDUCE-2289. Permissions race can make getStagingDir fail on local filesystem 
+    (ahmed via tucu)
+
+    MAPREDUCE-4372. Deadlock in Resource Manager (Devaraj K via bobby)
+
 Release 2.0.0-alpha - 05-23-2012
 
   INCOMPATIBLE CHANGES
@@ -386,6 +414,8 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4301. Dedupe some strings in MRAM for memory savings 
     (bobby via tgraves)
 
+    MAPREDUCE-4267. mavenize pipes (tgraves via bobby)
+
   OPTIMIZATIONS
 
     MAPREDUCE-3850. Avoid redundant calls for tokens in TokenCache (Daryn
@@ -568,6 +598,30 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-3350. Per-app RM page should have the list of application-attempts
     like on the app JHS page (Jonathon Eagles via tgraves)
 
+    MAPREDUCE-3842. Stop webpages from automatic refreshing (tgraves)
+
+    MAPREDUCE-3927. Shuffle hang when set map.failures.percent
+    (Bhallamudi Venkata Siva Kamesh via tgraves)
+
+    MAPREDUCE-4311. Capacity scheduler.xml does not accept decimal values for
+    capacity and maximum-capacity settings (Karthik Kambatla via tgraves)
+
+    MAPREDUCE-4341. add types to capacity scheduler properties documentation
+    (Karthik Kambatla via tgraves)
+
+    MAPREDUCE-4270. Move the data_join test classes to the correct path.
+    (Thomas Graves via sseth)
+
+    MAPREDUCE-3889. job client tries to use /tasklog interface, but that
+    doesn't exist anymore (Devaraj K via bobby)
+
+    MAPREDUCE-4320. gridmix mainClass wrong in pom.xml (tgraves)
+
+    MAPREDUCE-4295. RM crashes due to DNS issue (tgraves)
+
+    MAPREDUCE-4228. mapreduce.job.reduce.slowstart.completedmaps is not working
+    properly (Jason Lowe via bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1346682-1354801

Propchange: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1346682-1354801

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Thu Jun 28 06:59:38 2012
@@ -25,6 +25,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 
 
 /**
@@ -55,6 +56,11 @@ public interface TaskAttempt {
   String getAssignedContainerMgrAddress();
   
   /**
+   * @return node's id if a container is assigned, otherwise null.
+   */
+  NodeId getNodeId();
+  
+  /**
    * @return node's http address if a container is assigned, otherwise null.
    */
   String getNodeHttpAddress();

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Thu Jun 28 06:59:38 2012
@@ -44,5 +44,9 @@ public enum JobEventType {
   JOB_COUNTER_UPDATE,
   
   //Producer:TaskAttemptListener
-  JOB_TASK_ATTEMPT_FETCH_FAILURE
+  JOB_TASK_ATTEMPT_FETCH_FAILURE,
+  
+  //Producer:RMContainerAllocator
+  JOB_UPDATED_NODES
+  
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Jun 28 06:59:38 2012
@@ -77,6 +77,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -85,8 +86,10 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
@@ -100,6 +103,9 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -148,6 +154,12 @@ public class JobImpl implements org.apac
   private final Object tasksSyncHandle = new Object();
   private final Set<TaskId> mapTasks = new LinkedHashSet<TaskId>();
   private final Set<TaskId> reduceTasks = new LinkedHashSet<TaskId>();
+  /**
+   * maps nodes to tasks that have run on those nodes
+   */
+  private final HashMap<NodeId, List<TaskAttemptId>> 
+    nodesToSucceededTaskAttempts = new HashMap<NodeId, List<TaskAttemptId>>();
+
   private final EventHandler eventHandler;
   private final MRAppMetrics metrics;
   private final String userName;
@@ -194,6 +206,8 @@ public class JobImpl implements org.apac
           new TaskAttemptCompletedEventTransition();
   private static final CounterUpdateTransition COUNTER_UPDATE_TRANSITION =
       new CounterUpdateTransition();
+  private static final UpdatedNodesTransition UPDATED_NODES_TRANSITION =
+      new UpdatedNodesTransition();
 
   protected static final
     StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> 
@@ -218,7 +232,10 @@ public class JobImpl implements org.apac
           .addTransition(JobState.NEW, JobState.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-
+          // Ignore-able events
+          .addTransition(JobState.NEW, JobState.NEW,
+              JobEventType.JOB_UPDATED_NODES)
+              
           // Transitions from INITED state
           .addTransition(JobState.INITED, JobState.INITED,
               JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -234,7 +251,10 @@ public class JobImpl implements org.apac
           .addTransition(JobState.INITED, JobState.ERROR,
               JobEventType.INTERNAL_ERROR,
               INTERNAL_ERROR_TRANSITION)
-
+          // Ignore-able events
+          .addTransition(JobState.INITED, JobState.INITED,
+              JobEventType.JOB_UPDATED_NODES)
+              
           // Transitions from RUNNING state
           .addTransition(JobState.RUNNING, JobState.RUNNING,
               JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
@@ -252,6 +272,9 @@ public class JobImpl implements org.apac
           .addTransition(JobState.RUNNING, JobState.KILL_WAIT,
               JobEventType.JOB_KILL, new KillTasksTransition())
           .addTransition(JobState.RUNNING, JobState.RUNNING,
+              JobEventType.JOB_UPDATED_NODES,
+              UPDATED_NODES_TRANSITION)
+          .addTransition(JobState.RUNNING, JobState.RUNNING,
               JobEventType.JOB_MAP_TASK_RESCHEDULED,
               new MapTaskRescheduledTransition())
           .addTransition(JobState.RUNNING, JobState.RUNNING,
@@ -288,8 +311,9 @@ public class JobImpl implements org.apac
           // Ignore-able events
           .addTransition(JobState.KILL_WAIT, JobState.KILL_WAIT,
               EnumSet.of(JobEventType.JOB_KILL,
-                         JobEventType.JOB_MAP_TASK_RESCHEDULED,
-                         JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
+                  JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED,
+                  JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from SUCCEEDED state
           .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
@@ -303,7 +327,8 @@ public class JobImpl implements org.apac
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.SUCCEEDED, JobState.SUCCEEDED,
-              EnumSet.of(JobEventType.JOB_KILL,
+              EnumSet.of(JobEventType.JOB_KILL, 
+                  JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from FAILED state
@@ -318,7 +343,8 @@ public class JobImpl implements org.apac
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.FAILED, JobState.FAILED,
-              EnumSet.of(JobEventType.JOB_KILL,
+              EnumSet.of(JobEventType.JOB_KILL, 
+                  JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from KILLED state
@@ -333,7 +359,8 @@ public class JobImpl implements org.apac
               INTERNAL_ERROR_TRANSITION)
           // Ignore-able events
           .addTransition(JobState.KILLED, JobState.KILLED,
-              EnumSet.of(JobEventType.JOB_KILL,
+              EnumSet.of(JobEventType.JOB_KILL, 
+                  JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // No transitions from INTERNAL_ERROR state. Ignore all.
@@ -346,6 +373,7 @@ public class JobImpl implements org.apac
                   JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
                   JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_DIAGNOSTIC_UPDATE,
+                  JobEventType.JOB_UPDATED_NODES,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
                   JobEventType.INTERNAL_ERROR))
           .addTransition(JobState.ERROR, JobState.ERROR,
@@ -590,9 +618,9 @@ public class JobImpl implements org.apac
       float reduceProgress = 0f;
       for (Task task : this.tasks.values()) {
         if (task.getType() == TaskType.MAP) {
-          mapProgress += task.getProgress();
+          mapProgress += (task.isFinished() ? 1f : task.getProgress());
         } else {
-          reduceProgress += task.getProgress();
+          reduceProgress += (task.isFinished() ? 1f : task.getProgress());
         }
       }
       if (this.numMapTasks != 0) {
@@ -895,7 +923,7 @@ public class JobImpl implements org.apac
       LOG.info(msg.toString());
     }
   }
-
+  
   /**
    * ChainMapper and ChainReducer must execute in parallel, so they're not
    * compatible with uberization/LocalContainerLauncher (100% sequential).
@@ -924,6 +952,24 @@ public class JobImpl implements org.apac
     }
     return isChainJob;
   }
+  
+  private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) {
+    // rerun previously successful map tasks
+    List<TaskAttemptId> taskAttemptIdList = nodesToSucceededTaskAttempts.get(nodeId);
+    if(taskAttemptIdList != null) {
+      String mesg = "TaskAttempt killed because it ran on unusable node "
+          + nodeId;
+      for(TaskAttemptId id : taskAttemptIdList) {
+        if(TaskType.MAP == id.getTaskId().getTaskType()) {
+          // reschedule only map tasks because their outputs maybe unusable
+          LOG.info(mesg + ". AttemptId:" + id);
+          eventHandler.handle(new TaskAttemptKillEvent(id, mesg));
+        }
+      }
+    }
+    // currently running task attempts on unusable nodes are handled in
+    // RMContainerAllocator
+  }
 
   /*
   private int getBlockSize() {
@@ -1269,18 +1315,37 @@ public class JobImpl implements org.apac
       tce.setEventId(job.taskAttemptCompletionEvents.size());
       job.taskAttemptCompletionEvents.add(tce);
       
+      TaskAttemptId attemptId = tce.getAttemptId();
+      TaskId taskId = attemptId.getTaskId();
       //make the previous completion event as obsolete if it exists
       Object successEventNo = 
-        job.successAttemptCompletionEventNoMap.remove(tce.getAttemptId().getTaskId());
+        job.successAttemptCompletionEventNoMap.remove(taskId);
       if (successEventNo != null) {
         TaskAttemptCompletionEvent successEvent = 
           job.taskAttemptCompletionEvents.get((Integer) successEventNo);
         successEvent.setStatus(TaskAttemptCompletionEventStatus.OBSOLETE);
       }
-
+      
+      // if this attempt is not successful then why is the previous successful 
+      // attempt being removed above - MAPREDUCE-4330
       if (TaskAttemptCompletionEventStatus.SUCCEEDED.equals(tce.getStatus())) {
-        job.successAttemptCompletionEventNoMap.put(tce.getAttemptId().getTaskId(), 
-            tce.getEventId());
+        job.successAttemptCompletionEventNoMap.put(taskId, tce.getEventId());
+        
+        // here we could have simply called Task.getSuccessfulAttempt() but
+        // the event that triggers this code is sent before
+        // Task.successfulAttempt is set and so there is no guarantee that it
+        // will be available now
+        Task task = job.tasks.get(taskId);
+        TaskAttempt attempt = task.getAttempt(attemptId);
+        NodeId nodeId = attempt.getNodeId();
+        assert (nodeId != null); // node must exist for a successful event
+        List<TaskAttemptId> taskAttemptIdList = job.nodesToSucceededTaskAttempts
+            .get(nodeId);
+        if (taskAttemptIdList == null) {
+          taskAttemptIdList = new ArrayList<TaskAttemptId>();
+          job.nodesToSucceededTaskAttempts.put(nodeId, taskAttemptIdList);
+        }
+        taskAttemptIdList.add(attempt.getID());
       }
     }
   }
@@ -1460,7 +1525,22 @@ public class JobImpl implements org.apac
       }
     }
   }
-
+  
+  private static class UpdatedNodesTransition implements
+      SingleArcTransition<JobImpl, JobEvent> {
+    @Override
+    public void transition(JobImpl job, JobEvent event) {
+      JobUpdatedNodesEvent updateEvent = (JobUpdatedNodesEvent) event;
+      for(NodeReport nr: updateEvent.getUpdatedNodes()) {
+        NodeState nodeState = nr.getNodeState();
+        if(nodeState.isUnusable()) {
+          // act on the updates
+          job.actOnUnusableNode(nr.getNodeId(), nodeState);
+        }
+      }
+    }
+  }
+  
   private static class InternalErrorTransition implements
       SingleArcTransition<JobImpl, JobEvent> {
     @Override

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Jun 28 06:59:38 2012
@@ -84,6 +84,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
@@ -404,14 +405,17 @@ public abstract class TaskAttemptImpl im
          TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
          new TooManyFetchFailureTransition())
      .addTransition(
+         TaskAttemptState.SUCCEEDED, TaskAttemptState.KILLED,
+         TaskAttemptEventType.TA_KILL,
+         new KilledAfterSuccessTransition())
+     .addTransition(
          TaskAttemptState.SUCCEEDED, TaskAttemptState.SUCCEEDED,
          TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
          DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
      // Ignore-able events for SUCCEEDED state
      .addTransition(TaskAttemptState.SUCCEEDED,
          TaskAttemptState.SUCCEEDED,
-         EnumSet.of(TaskAttemptEventType.TA_KILL,
-             TaskAttemptEventType.TA_FAILMSG,
+         EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
              TaskAttemptEventType.TA_CONTAINER_CLEANED,
              TaskAttemptEventType.TA_CONTAINER_COMPLETED))
 
@@ -818,6 +822,16 @@ public abstract class TaskAttemptImpl im
     }
   }
 
+  @Override 
+  public NodeId getNodeId() {
+    readLock.lock();
+    try {
+      return containerNodeId;
+    } finally {
+      readLock.unlock();
+    }
+  }
+  
   /**If container Assigned then return the node's address, otherwise null.
    */
   @Override
@@ -999,7 +1013,7 @@ public abstract class TaskAttemptImpl im
   }
 
   private static JobCounterUpdateEvent createJobCounterUpdateEventTAFailed(
-      TaskAttemptImpl taskAttempt) {
+      TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
     
@@ -1007,16 +1021,22 @@ public abstract class TaskAttemptImpl im
     
     if (taskType == TaskType.MAP) {
       jce.addCounterUpdate(JobCounter.NUM_FAILED_MAPS, 1);
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+      if(!taskAlreadyCompleted) {
+        // dont double count the elapsed time
+        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+      }
     } else {
       jce.addCounterUpdate(JobCounter.NUM_FAILED_REDUCES, 1);
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+      if(!taskAlreadyCompleted) {
+        // dont double count the elapsed time
+        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+      }
     }
     return jce;
   }
   
   private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled(
-      TaskAttemptImpl taskAttempt) {
+      TaskAttemptImpl taskAttempt, boolean taskAlreadyCompleted) {
     TaskType taskType = taskAttempt.getID().getTaskId().getTaskType();
     JobCounterUpdateEvent jce = new JobCounterUpdateEvent(taskAttempt.getID().getTaskId().getJobId());
     
@@ -1024,10 +1044,16 @@ public abstract class TaskAttemptImpl im
     
     if (taskType == TaskType.MAP) {
       jce.addCounterUpdate(JobCounter.NUM_KILLED_MAPS, 1);
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+      if(!taskAlreadyCompleted) {
+        // dont double count the elapsed time
+        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_MAPS, slotMillisIncrement);
+      }
     } else {
       jce.addCounterUpdate(JobCounter.NUM_KILLED_REDUCES, 1);
-      jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+      if(!taskAlreadyCompleted) {
+        // dont double count the elapsed time
+        jce.addCounterUpdate(JobCounter.SLOTS_MILLIS_REDUCES, slotMillisIncrement);
+      }
     }
     return jce;
   }  
@@ -1259,10 +1285,10 @@ public abstract class TaskAttemptImpl im
                 finalState);
         if(finalState == TaskAttemptState.FAILED) {
           taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
         } else if(finalState == TaskAttemptState.KILLED) {
           taskAttempt.eventHandler
-          .handle(createJobCounterUpdateEventTAKilled(taskAttempt));
+          .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
         }
         taskAttempt.eventHandler.handle(new JobHistoryEvent(
             taskAttempt.attemptId.getTaskId().getJobId(), tauce));
@@ -1394,7 +1420,7 @@ public abstract class TaskAttemptImpl im
       
       if (taskAttempt.getLaunchTime() != 0) {
         taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
                 TaskAttemptState.FAILED);
@@ -1463,7 +1489,7 @@ public abstract class TaskAttemptImpl im
       
       if (taskAttempt.getLaunchTime() != 0) {
         taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAFailed(taskAttempt));
+            .handle(createJobCounterUpdateEventTAFailed(taskAttempt, true));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
                 TaskAttemptState.FAILED);
@@ -1477,6 +1503,32 @@ public abstract class TaskAttemptImpl im
           taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
     }
   }
+  
+  private static class KilledAfterSuccessTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      TaskAttemptKillEvent msgEvent = (TaskAttemptKillEvent) event;
+      //add to diagnostic
+      taskAttempt.addDiagnosticInfo(msgEvent.getMessage());
+
+      // not setting a finish time since it was set on success
+      assert (taskAttempt.getFinishTime() != 0);
+
+      assert (taskAttempt.getLaunchTime() != 0);
+      taskAttempt.eventHandler
+          .handle(createJobCounterUpdateEventTAKilled(taskAttempt, true));
+      TaskAttemptUnsuccessfulCompletionEvent tauce = createTaskAttemptUnsuccessfulCompletionEvent(
+          taskAttempt, TaskAttemptState.KILLED);
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId
+          .getTaskId().getJobId(), tauce));
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED));
+    }
+  }
 
   private static class KilledTransition implements
       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@@ -1489,7 +1541,7 @@ public abstract class TaskAttemptImpl im
       taskAttempt.setFinishTime();
       if (taskAttempt.getLaunchTime() != 0) {
         taskAttempt.eventHandler
-            .handle(createJobCounterUpdateEventTAKilled(taskAttempt));
+            .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false));
         TaskAttemptUnsuccessfulCompletionEvent tauce =
             createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
                 TaskAttemptState.KILLED);

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Jun 28 06:59:38 2012
@@ -191,13 +191,14 @@ public abstract class TaskImpl implement
     .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
         EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
         TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
+    .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
+        EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED),
+        TaskEventType.T_ATTEMPT_KILLED, new MapRetroactiveKilledTransition())
     // Ignore-able transitions.
     .addTransition(
         TaskState.SUCCEEDED, TaskState.SUCCEEDED,
-        EnumSet.of(TaskEventType.T_KILL,
-            TaskEventType.T_ADD_SPEC_ATTEMPT,
-            TaskEventType.T_ATTEMPT_LAUNCHED,
-            TaskEventType.T_ATTEMPT_KILLED))
+        EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_LAUNCHED))
 
     // Transitions from FAILED state        
     .addTransition(TaskState.FAILED, TaskState.FAILED,
@@ -629,7 +630,6 @@ public abstract class TaskImpl implement
   // always called inside a transition, in turn inside the Write Lock
   private void handleTaskAttemptCompletion(TaskAttemptId attemptId,
       TaskAttemptCompletionEventStatus status) {
-    finishedAttempts++;
     TaskAttempt attempt = attempts.get(attemptId);
     //raise the completion event only if the container is assigned
     // to nextAttemptNumber
@@ -681,6 +681,11 @@ public abstract class TaskImpl implement
         taId == null ? null : TypeConverter.fromYarn(taId));
     return taskFailedEvent;
   }
+  
+  private static void unSucceed(TaskImpl task) {
+    task.commitAttempt = null;
+    task.successfulAttempt = null;
+  }
 
   /**
   * @return a String representation of the splits.
@@ -755,6 +760,7 @@ public abstract class TaskImpl implement
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.SUCCEEDED);
+      task.finishedAttempts++;
       --task.numberUncompletedAttempts;
       task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
       task.eventHandler.handle(new JobTaskEvent(
@@ -790,6 +796,7 @@ public abstract class TaskImpl implement
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.KILLED);
+      task.finishedAttempts++;
       --task.numberUncompletedAttempts;
       if (task.successfulAttempt == null) {
         task.addAndScheduleAttempt();
@@ -808,6 +815,7 @@ public abstract class TaskImpl implement
       task.handleTaskAttemptCompletion(
           ((TaskTAttemptEvent) event).getTaskAttemptID(), 
           TaskAttemptCompletionEventStatus.KILLED);
+      task.finishedAttempts++;
       // check whether all attempts are finished
       if (task.finishedAttempts == task.attempts.size()) {
         if (task.historyTaskStartGenerated) {
@@ -845,6 +853,7 @@ public abstract class TaskImpl implement
             attempt.getAssignedContainerMgrAddress()));
       }
       
+      task.finishedAttempts++;
       if (task.failedAttempts < task.maxAttempts) {
         task.handleTaskAttemptCompletion(
             ((TaskTAttemptEvent) event).getTaskAttemptID(), 
@@ -880,12 +889,6 @@ public abstract class TaskImpl implement
     protected TaskState getDefaultState(Task task) {
       return task.getState();
     }
-
-    protected void unSucceed(TaskImpl task) {
-      ++task.numberUncompletedAttempts;
-      task.commitAttempt = null;
-      task.successfulAttempt = null;
-    }
   }
 
   private static class MapRetroactiveFailureTransition
@@ -908,6 +911,8 @@ public abstract class TaskImpl implement
       //  fails, we have to let AttemptFailedTransition.transition
       //  believe that there's no redundancy.
       unSucceed(task);
+      // fake increase in Uncomplete attempts for super.transition
+      ++task.numberUncompletedAttempts;
       return super.transition(task, event);
     }
 
@@ -917,6 +922,45 @@ public abstract class TaskImpl implement
     }
   }
 
+  private static class MapRetroactiveKilledTransition implements
+    MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+
+    @Override
+    public TaskState transition(TaskImpl task, TaskEvent event) {
+      // verify that this occurs only for map task
+      // TODO: consider moving it to MapTaskImpl
+      if (!TaskType.MAP.equals(task.getType())) {
+        LOG.error("Unexpected event for REDUCE task " + event.getType());
+        task.internalError(event.getType());
+      }
+
+      TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event;
+      TaskAttemptId attemptId = attemptEvent.getTaskAttemptID();
+      if(task.successfulAttempt == attemptId) {
+        // successful attempt is now killed. reschedule
+        // tell the job about the rescheduling
+        unSucceed(task);
+        task.handleTaskAttemptCompletion(
+            attemptId, 
+            TaskAttemptCompletionEventStatus.KILLED);
+        task.eventHandler.handle(new JobMapTaskRescheduledEvent(task.taskId));
+        // typically we are here because this map task was run on a bad node and 
+        // we want to reschedule it on a different node.
+        // Depending on whether there are previous failed attempts or not this 
+        // can SCHEDULE or RESCHEDULE the container allocate request. If this
+        // SCHEDULE's then the dataLocal hosts of this taskAttempt will be used
+        // from the map splitInfo. So the bad node might be sent as a location 
+        // to the RM. But the RM would ignore that just like it would ignore 
+        // currently pending container requests affinitized to bad nodes.
+        task.addAndScheduleAttempt();
+        return TaskState.SCHEDULED;
+      } else {
+        // nothing to do
+        return TaskState.SUCCEEDED;
+      }
+    }
+  }
+
   private static class KillNewTransition 
     implements SingleArcTransition<TaskImpl, TaskEvent> {
     @Override
@@ -966,6 +1010,7 @@ public abstract class TaskImpl implement
     public void transition(TaskImpl task, TaskEvent event) {
       task.metrics.launchedTask(task);
       task.metrics.runningTask(task);
+      
     }
   }
 }

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jun 28 06:59:38 2012
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -46,19 +47,27 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobCounterUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.AMResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
@@ -408,15 +417,6 @@ public class RMContainerAllocator extend
     
     LOG.info("Recalculating schedule...");
     
-    //if all maps are assigned, then ramp up all reduces irrespective of the 
-    //headroom
-    if (scheduledMaps == 0 && numPendingReduces > 0) {
-      LOG.info("All maps assigned. " +
-      		"Ramping up all remaining reduces:" + numPendingReduces);
-      scheduleAllReduces();
-      return;
-    }
-    
     //check for slow start
     if (!getIsReduceStarted()) {//not set yet
       int completedMapsForReduceSlowstart = (int)Math.ceil(reduceSlowStart * 
@@ -432,6 +432,15 @@ public class RMContainerAllocator extend
       }
     }
     
+    //if all maps are assigned, then ramp up all reduces irrespective of the
+    //headroom
+    if (scheduledMaps == 0 && numPendingReduces > 0) {
+      LOG.info("All maps assigned. " +
+          "Ramping up all remaining reduces:" + numPendingReduces);
+      scheduleAllReduces();
+      return;
+    }
+
     float completedMapPercent = 0f;
     if (totalMaps != 0) {//support for 0 maps
       completedMapPercent = (float)completedMaps/totalMaps;
@@ -489,7 +498,8 @@ public class RMContainerAllocator extend
     }
   }
 
-  private void scheduleAllReduces() {
+  @Private
+  public void scheduleAllReduces() {
     for (ContainerRequest req : pendingReduces) {
       scheduledRequests.addReduce(req);
     }
@@ -583,7 +593,9 @@ public class RMContainerAllocator extend
 
     //Called on each allocation. Will know about newly blacklisted/added hosts.
     computeIgnoreBlacklisting();
-    
+
+    handleUpdatedNodes(response);
+
     for (ContainerStatus cont : finishedContainers) {
       LOG.info("Received completed container " + cont.getContainerId());
       TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
@@ -600,10 +612,48 @@ public class RMContainerAllocator extend
         String diagnostics = cont.getDiagnostics();
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
             diagnostics));
-      }
+      }      
     }
     return newContainers;
   }
+  
+  @SuppressWarnings("unchecked")
+  private void handleUpdatedNodes(AMResponse response) {
+    // send event to the job about on updated nodes
+    List<NodeReport> updatedNodes = response.getUpdatedNodes();
+    if (!updatedNodes.isEmpty()) {
+
+      // send event to the job to act upon completed tasks
+      eventHandler.handle(new JobUpdatedNodesEvent(getJob().getID(),
+          updatedNodes));
+
+      // act upon running tasks
+      HashSet<NodeId> unusableNodes = new HashSet<NodeId>();
+      for (NodeReport nr : updatedNodes) {
+        NodeState nodeState = nr.getNodeState();
+        if (nodeState.isUnusable()) {
+          unusableNodes.add(nr.getNodeId());
+        }
+      }
+      for (int i = 0; i < 2; ++i) {
+        HashMap<TaskAttemptId, Container> taskSet = i == 0 ? assignedRequests.maps
+            : assignedRequests.reduces;
+        // kill running containers
+        for (Map.Entry<TaskAttemptId, Container> entry : taskSet.entrySet()) {
+          TaskAttemptId tid = entry.getKey();
+          NodeId taskAttemptNodeId = entry.getValue().getNodeId();
+          if (unusableNodes.contains(taskAttemptNodeId)) {
+            LOG.info("Killing taskAttempt:" + tid
+                + " because it is running on unusable node:"
+                + taskAttemptNodeId);
+            eventHandler.handle(new TaskAttemptKillEvent(tid,
+                "TaskAttempt killed because it ran on unusable node"
+                    + taskAttemptNodeId));
+          }
+        }
+      }
+    }
+  }
 
   @Private
   public int getMemLimit() {
@@ -743,7 +793,6 @@ public class RMContainerAllocator extend
         boolean blackListed = false;         
         ContainerRequest assigned = null;
         
-        ContainerId allocatedContainerId = allocated.getId();
         if (isAssignable) {
           // do not assign if allocated container is on a  
           // blacklisted host
@@ -790,7 +839,7 @@ public class RMContainerAllocator extend
               eventHandler.handle(new TaskAttemptContainerAssignedEvent(
                   assigned.attemptID, allocated, applicationACLs));
 
-              assignedRequests.add(allocatedContainerId, assigned.attemptID);
+              assignedRequests.add(allocated, assigned.attemptID);
 
               if (LOG.isDebugEnabled()) {
                 LOG.info("Assigned container (" + allocated + ") "
@@ -811,7 +860,7 @@ public class RMContainerAllocator extend
         // or if we could not assign it 
         if (blackListed || assigned == null) {
           containersReleased++;
-          release(allocatedContainerId);
+          release(allocated.getId());
         }
       }
     }
@@ -974,20 +1023,20 @@ public class RMContainerAllocator extend
   private class AssignedRequests {
     private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
       new HashMap<ContainerId, TaskAttemptId>();
-    private final LinkedHashMap<TaskAttemptId, ContainerId> maps = 
-      new LinkedHashMap<TaskAttemptId, ContainerId>();
-    private final LinkedHashMap<TaskAttemptId, ContainerId> reduces = 
-      new LinkedHashMap<TaskAttemptId, ContainerId>();
+    private final LinkedHashMap<TaskAttemptId, Container> maps = 
+      new LinkedHashMap<TaskAttemptId, Container>();
+    private final LinkedHashMap<TaskAttemptId, Container> reduces = 
+      new LinkedHashMap<TaskAttemptId, Container>();
     private final Set<TaskAttemptId> preemptionWaitingReduces = 
       new HashSet<TaskAttemptId>();
     
-    void add(ContainerId containerId, TaskAttemptId tId) {
-      LOG.info("Assigned container " + containerId.toString() + " to " + tId);
-      containerToAttemptMap.put(containerId, tId);
+    void add(Container container, TaskAttemptId tId) {
+      LOG.info("Assigned container " + container.getId().toString() + " to " + tId);
+      containerToAttemptMap.put(container.getId(), tId);
       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
-        maps.put(tId, containerId);
+        maps.put(tId, container);
       } else {
-        reduces.put(tId, containerId);
+        reduces.put(tId, container);
       }
     }
 
@@ -1017,9 +1066,9 @@ public class RMContainerAllocator extend
     boolean remove(TaskAttemptId tId) {
       ContainerId containerId = null;
       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
-        containerId = maps.remove(tId);
+        containerId = maps.remove(tId).getId();
       } else {
-        containerId = reduces.remove(tId);
+        containerId = reduces.remove(tId).getId();
         if (containerId != null) {
           boolean preempted = preemptionWaitingReduces.remove(tId);
           if (preempted) {
@@ -1038,12 +1087,20 @@ public class RMContainerAllocator extend
     TaskAttemptId get(ContainerId cId) {
       return containerToAttemptMap.get(cId);
     }
+    
+    NodeId getNodeId(TaskAttemptId tId) {
+      if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
+        return maps.get(tId).getNodeId();
+      } else {
+        return reduces.get(tId).getNodeId();
+      }
+    }
 
     ContainerId get(TaskAttemptId tId) {
       if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
-        return maps.get(tId);
+        return maps.get(tId).getId();
       } else {
-        return reduces.get(tId);
+        return reduces.get(tId).getId();
       }
     }
   }

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersPage.java Thu Jun 28 06:59:38 2012
@@ -28,9 +28,6 @@ public class CountersPage extends AppVie
   @Override protected void preHead(Page.HTML<_> html) {
     commonPreHead(html);
 
-    // Counters page is a summary. Helps to refresh automatically.
-    html.meta_http("refresh", "10");
-
     String tid = $(TASK_ID);
     String activeNav = "3";
     if(tid == null || tid.isEmpty()) {

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobPage.java Thu Jun 28 06:59:38 2012
@@ -33,9 +33,6 @@ public class JobPage extends AppView {
                : join("MapReduce Job ", $(JOB_ID)));
     commonPreHead(html);
 
-    // This is a job-summary page. Helps to refresh automatically.
-    html.meta_http("refresh", "10");
-
     set(initID(ACCORDION, "nav"), "{autoHeight:false, active:2}");
   }
 

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Thu Jun 28 06:59:38 2012
@@ -126,10 +126,6 @@ public class TaskPage extends AppView {
   @Override protected void preHead(Page.HTML<_> html) {
     commonPreHead(html);
 
-    // This page is a list of all attempts which are limited in number. Okay to
-    // refresh automatically.
-    html.meta_http("refresh", "10");
-
     set(initID(ACCORDION, "nav"), "{autoHeight:false, active:3}");
     set(DATATABLES_ID, "attempts");
     set(initID(DATATABLES, "attempts"), attemptsTableInit());

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Thu Jun 28 06:59:38 2012
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.MockApps;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -232,6 +233,11 @@ public class MockJobs extends MockApps {
     diags.add(DIAGS.next());
     return new TaskAttempt() {
       @Override
+      public NodeId getNodeId() throws UnsupportedOperationException{
+        throw new UnsupportedOperationException();
+      }
+      
+      @Override
       public TaskAttemptId getID() {
         return taid;
       }

Modified: hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1354832&r1=1354831&r2=1354832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/HDFS-3092/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Thu Jun 28 06:59:38 2012
@@ -22,6 +22,7 @@ import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 
+import java.util.ArrayList;
 import java.util.Iterator;
 
 import junit.framework.Assert;
@@ -29,17 +30,26 @@ import junit.framework.Assert;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.junit.Test;
 
 /**
@@ -160,6 +170,159 @@ public class TestMRApp {
     
     app.waitForState(job, JobState.SUCCEEDED);
   }
+  
+  /**
+   * The test verifies that the AM re-runs maps that have run on bad nodes. It
+   * also verifies that the AM records all success/killed events so that reduces
+   * are notified about map output status changes. It also verifies that the
+   * re-run information is preserved across AM restart
+   */
+  @Test
+  public void testUpdatedNodes() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    // after half of the map completion, reduce will start
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
+    // uberization forces full slowstart (1.0), so disable that
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator()
+        .next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator()
+        .next();
+    NodeId node1 = task1Attempt.getNodeId();
+    NodeId node2 = task2Attempt.getNodeId();
+    Assert.assertEquals(node1, node2);
+
+    // send the done signal to the task
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task1Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task2Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+
+    // all maps must be succeeded
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0,
+        100);
+    Assert.assertEquals("Expecting 2 completion events for success", 2,
+        events.length);
+
+    // send updated nodes info
+    ArrayList<NodeReport> updatedNodes = new ArrayList<NodeReport>();
+    NodeReport nr = RecordFactoryProvider.getRecordFactory(null)
+        .newRecordInstance(NodeReport.class);
+    nr.setNodeId(node1);
+    nr.setNodeState(NodeState.UNHEALTHY);
+    updatedNodes.add(nr);
+    app.getContext().getEventHandler()
+        .handle(new JobUpdatedNodesEvent(job.getID(), updatedNodes));
+
+    app.waitForState(task1Attempt, TaskAttemptState.KILLED);
+    app.waitForState(task2Attempt, TaskAttemptState.KILLED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 2 more completion events for killed", 4,
+        events.length);
+
+    // all maps must be back to running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    task1Attempt = itr.next();
+
+    // send the done signal to the task
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task1Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+
+    // map1 must be succeeded. map2 must be running
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 5,
+        events.length);
+
+    // Crash the app again.
+    app.stop();
+
+    // rerun
+    // in rerun the 1st map will be recovered from previous run
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    Task reduceTask = it.next();
+
+    // map 1 will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals(
+        "Expecting 2 completion events for killed & success of map1", 2,
+        events.length);
+
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task2Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 3,
+        events.length);
+
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt task3Attempt = reduceTask.getAttempts().values().iterator()
+        .next();
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task3Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.waitForState(reduceTask, TaskState.SUCCEEDED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 4,
+        events.length);
+
+    // job succeeds
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
 
   @Test
   public void testJobError() throws Exception {
@@ -194,10 +357,6 @@ public class TestMRApp {
       ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
       return spiedJob;
     }
-
-    JobImpl getSpiedJob() {
-      return this.spiedJob;
-    }
   }
 
   @Test
@@ -232,6 +391,21 @@ public class TestMRApp {
       TypeConverter.fromYarn(state);
     }
   }
+  
+  private final class MRAppWithHistory extends MRApp {
+    public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart, int startCount) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, 
+          getStartCount());
+      return eventHandler;
+    }
+  }
 
   public static void main(String[] args) throws Exception {
     TestMRApp t = new TestMRApp();



Mime
View raw message