hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1453669 [1/2] - in /hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoo...
Date Thu, 07 Mar 2013 02:57:55 GMT
Author: szetszwo
Date: Thu Mar  7 02:57:40 2013
New Revision: 1453669

URL: http://svn.apache.org/r1453669
Log:
Merge r1449958 through r1453659 from trunk.

Modified:
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/bin/mapred
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/OnDiskMapOutput.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (contents, props changed)
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestMergeManager.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/CompletedTaskAttempt.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/MiniMRClientClusterFactory.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestMapProgress.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobsWithHistoryService.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/testshell/ExternalMapReduce.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedChunkedFile.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/FadvisedFileRegion.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
    hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/pom.xml

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1449958-1453659

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt Thu Mar  7 02:57:40 2013
@@ -14,6 +14,10 @@ Trunk (Unreleased)
     MAPREDUCE-4887. Add RehashPartitioner, to smooth distributions
     with poor implementations of Object#hashCode().  (Radim Kolar via cutting)
 
+    HADOOP-8562. Enhancements to support Hadoop on Windows Server and Windows
+    Azure environments. (See breakdown of tasks below for subtasks and
+    contributors)
+
   IMPROVEMENTS
 
     MAPREDUCE-3787. [Gridmix] Optimize job monitoring and STRESS mode for
@@ -155,6 +159,27 @@ Trunk (Unreleased)
     MAPREDUCE-5012. Typo in javadoc for IdentityMapper class. (Adam Monsen
     via suresh)
 
+  BREAKDOWN OF HADOOP-8562 SUBTASKS
+
+    MAPREDUCE-4739. Some MapReduce tests fail to find winutils.
+    (Chris Nauroth via suresh)
+
+    MAPREDUCE-4780. MapReduce distribution build fails on Windows.
+    (Chris Nauroth via suresh)
+
+    MAPREDUCE-4790. MapReduce build script would be more readable using abspath.
+    (Chris Nauroth via suresh)
+
+    MAPREDUCE-4869. Fix TestMapReduceChildJVM. (Chris Nauroth via acmurthy)
+
+    MAPREDUCE-4870. Fix TestMRJobsWithHistoryService. (Chris Nauroth via acmurthy)
+
+    MAPREDUCE-4983. Fixed various platform specific assumptions in various tests,
+    so that they can pass on Windows too. (Chris Nauroth via vinodkv)
+
+    HADOOP-9372. Fix bad timeout annotations on tests.
+    (Arpit Agarwal via suresh)
+
 Release 2.0.4-beta - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -163,6 +188,12 @@ Release 2.0.4-beta - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-5033. mapred shell script should respect usage flags
+    (--help -help -h). (Andrew Wang via atm)
+
+    MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits'
+    allocation on small clusters. (Bikas Saha via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -186,6 +217,19 @@ Release 2.0.4-beta - UNRELEASED
     MAPREDUCE-4951. Container preemption interpreted as task failure.
     (Sandy Ryza via tomwhite)
 
+    MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER.
+    (Sandy Ryza via tomwhite)
+
+    MAPREDUCE-4693. History server should include counters for failed tasks.
+    (Xuan Gong via sseth)
+
+    MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does 
+    not exist. (sandyr via tucu)
+
+    MAPREDUCE-3685. Fix bugs in MergeManager to ensure compression codec is
+    appropriately used and that on-disk segments are correctly sorted on
+    file-size. (Anty Rao and Ravi Prakash via acmurthy) 
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES
@@ -716,6 +760,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-4989. JSONify DataTables input data for Attempts page (Ravi
     Prakash via jlowe)
 
+    MAPREDUCE-5027. Shuffle does not limit number of outstanding connections
+    (Robert Parker via jeagles)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
@@ -735,6 +782,15 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear
     the value from the Task commitAttempt member (Robert Parker via jeagles)
 
+    MAPREDUCE-4871. AM uses mapreduce.jobtracker.split.metainfo.maxsize but
+    mapred-default has mapreduce.job.split.metainfo.maxsize (Jason Lowe via
+    jeagles)
+
+    MAPREDUCE-4794. DefaultSpeculator generates error messages on normal
+    shutdown (Jason Lowe via jeagles)
+
+    MAPREDUCE-5043. Fetch failure processing can cause AM event queue to
+    backup and eventually OOM (Jason Lowe via bobby)
 
 Release 0.23.6 - UNRELEASED
 

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1449958-1453659

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/bin/mapred
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/bin/mapred?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/bin/mapred (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/bin/mapred Thu Mar  7 02:57:40 2013
@@ -50,6 +50,14 @@ fi
 COMMAND=$1
 shift
 
+case $COMMAND in
+  # usage flags
+  --help|-help|-h)
+    print_usage
+    exit
+    ;;
+esac
+
 if [ "$COMMAND" = "job" ] ; then
   CLASS=org.apache.hadoop.mapred.JobClient
 elif [ "$COMMAND" = "queue" ] ; then
@@ -127,10 +135,6 @@ for f in $HADOOP_MAPRED_HOME/modules/*.j
   CLASSPATH=${CLASSPATH}:$f;
 done
 
-if $cygwin; then
-  CLASSPATH=`cygpath -p -w "$CLASSPATH"`
-fi
-
 if [ "$COMMAND" = "classpath" ] ; then
   echo $CLASSPATH
   exit

Propchange: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1449958-1453659

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/MapReduceChildJVM.java Thu Mar  7 02:57:40 2013
@@ -164,7 +164,6 @@ public class MapReduceChildJVM {
 
     Vector<String> vargs = new Vector<String>(8);
 
-    vargs.add("exec");
     vargs.add(Environment.JAVA_HOME.$() + "/bin/java");
 
     // Add child (task) java-vm options.

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Thu Mar  7 02:57:40 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.util.List;
 
 import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
@@ -37,6 +38,7 @@ public interface TaskAttempt {
   List<String> getDiagnostics();
   Counters getCounters();
   float getProgress();
+  Phase getPhase();
   TaskAttemptState getState();
 
   /** 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Mar  7 02:57:40 2013
@@ -1672,6 +1672,20 @@ public class JobImpl implements org.apac
       SingleArcTransition<JobImpl, JobEvent> {
     @Override
     public void transition(JobImpl job, JobEvent event) {
+      //get number of shuffling reduces
+      int shufflingReduceTasks = 0;
+      for (TaskId taskId : job.reduceTasks) {
+        Task task = job.tasks.get(taskId);
+        if (TaskState.RUNNING.equals(task.getState())) {
+          for(TaskAttempt attempt : task.getAttempts().values()) {
+            if(attempt.getPhase() == Phase.SHUFFLE) {
+              shufflingReduceTasks++;
+              break;
+            }
+          }
+        }
+      }
+
       JobTaskAttemptFetchFailureEvent fetchfailureEvent = 
         (JobTaskAttemptFetchFailureEvent) event;
       for (org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId mapId : 
@@ -1680,20 +1694,6 @@ public class JobImpl implements org.apac
         fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
         job.fetchFailuresMapping.put(mapId, fetchFailures);
         
-        //get number of shuffling reduces
-        int shufflingReduceTasks = 0;
-        for (TaskId taskId : job.reduceTasks) {
-          Task task = job.tasks.get(taskId);
-          if (TaskState.RUNNING.equals(task.getState())) {
-            for(TaskAttempt attempt : task.getAttempts().values()) {
-              if(attempt.getReport().getPhase() == Phase.SHUFFLE) {
-                shufflingReduceTasks++;
-                break;
-              }
-            }
-          }
-        }
-        
         float failureRate = shufflingReduceTasks == 0 ? 1.0f : 
           (float) fetchFailures / shufflingReduceTasks;
         // declare faulty if fetch-failures >= max-allowed-failures

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Mar  7 02:57:40 2013
@@ -994,6 +994,16 @@ public abstract class TaskAttemptImpl im
   }
 
   @Override
+  public Phase getPhase() {
+    readLock.lock();
+    try {
+      return reportedStatus.phase;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
   public TaskAttemptState getState() {
     readLock.lock();
     try {
@@ -1183,7 +1193,8 @@ public abstract class TaskAttemptImpl im
             taskAttempt.nodeRackName == null ? "UNKNOWN" 
                 : taskAttempt.nodeRackName,
             StringUtils.join(
-                LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
+                LINE_SEPARATOR, taskAttempt.getDiagnostics()),
+                taskAttempt.getCounters(), taskAttempt
                 .getProgressSplitBlock().burst());
     return tauce;
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Mar  7 02:57:40 2013
@@ -730,7 +730,8 @@ public abstract class TaskImpl implement
         TypeConverter.fromYarn(task.getType()),
         errorSb.toString(),
         taskState.toString(),
-        taId == null ? null : TypeConverter.fromYarn(taId));
+        taId == null ? null : TypeConverter.fromYarn(taId),
+        task.getCounters());
     return taskFailedEvent;
   }
   

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/speculate/DefaultSpeculator.java Thu Mar  7 02:57:40 2013
@@ -91,6 +91,7 @@ public class DefaultSpeculator extends A
   private final Configuration conf;
   private AppContext context;
   private Thread speculationBackgroundThread = null;
+  private volatile boolean stopped = false;
   private BlockingQueue<SpeculatorEvent> eventQueue
       = new LinkedBlockingQueue<SpeculatorEvent>();
   private TaskRuntimeEstimator estimator;
@@ -170,7 +171,7 @@ public class DefaultSpeculator extends A
         = new Runnable() {
             @Override
             public void run() {
-              while (!Thread.currentThread().isInterrupted()) {
+              while (!stopped && !Thread.currentThread().isInterrupted()) {
                 long backgroundRunStartTime = clock.getTime();
                 try {
                   int speculations = computeSpeculations();
@@ -189,8 +190,9 @@ public class DefaultSpeculator extends A
                   Object pollResult
                       = scanControl.poll(wait, TimeUnit.MILLISECONDS);
                 } catch (InterruptedException e) {
-                  LOG.error("Background thread returning, interrupted : " + e);
-                  e.printStackTrace(System.out);
+                  if (!stopped) {
+                    LOG.error("Background thread returning, interrupted", e);
+                  }
                   return;
                 }
               }
@@ -205,6 +207,7 @@ public class DefaultSpeculator extends A
 
   @Override
   public void stop() {
+    stopped = true;
     // this could be called before background thread is established
     if (speculationBackgroundThread != null) {
       speculationBackgroundThread.interrupt();

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockJobs.java Thu Mar  7 02:57:40 2013
@@ -277,6 +277,11 @@ public class MockJobs extends MockApps {
       }
 
       @Override
+      public Phase getPhase() {
+        return report.getPhase();
+      }
+
+      @Override
       public TaskAttemptState getState() {
         return report.getTaskAttemptState();
       }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java Thu Mar  7 02:57:40 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@@ -639,6 +640,11 @@ public class TestRuntimeEstimators {
     }
 
     @Override
+    public Phase getPhase() {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+
+    @Override
     public TaskAttemptState getState() {
       if (overridingState != null) {
         return overridingState;

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestMapReduceChildJVM.java Thu Mar  7 02:57:40 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.junit.Test;
 
@@ -37,7 +38,7 @@ public class TestMapReduceChildJVM {
 
   private static final Log LOG = LogFactory.getLog(TestMapReduceChildJVM.class);
 
-  @Test
+  @Test (timeout = 30000)
   public void testCommandLine() throws Exception {
 
     MyMRApp app = new MyMRApp(1, 0, true, this.getClass().getName(), true);
@@ -46,10 +47,10 @@ public class TestMapReduceChildJVM {
     app.verifyCompleted();
 
     Assert.assertEquals(
-      "[exec $JAVA_HOME/bin/java" +
+      "[" + envVar("JAVA_HOME") + "/bin/java" +
       " -Djava.net.preferIPv4Stack=true" +
       " -Dhadoop.metrics.log.level=WARN" +
-      "  -Xmx200m -Djava.io.tmpdir=$PWD/tmp" +
+      "  -Xmx200m -Djava.io.tmpdir=" + envVar("PWD") + "/tmp" +
       " -Dlog4j.configuration=container-log4j.properties" +
       " -Dyarn.app.mapreduce.container.log.dir=<LOG_DIR>" +
       " -Dyarn.app.mapreduce.container.log.filesize=0" +
@@ -88,4 +89,16 @@ public class TestMapReduceChildJVM {
       };
     }
   }
+
+  /**
+   * Returns platform-specific string for retrieving the value of an environment
+   * variable with the given name.  On Unix, this returns $name.  On Windows,
+   * this returns %name%.
+   * 
+   * @param name String environment variable name
+   * @return String for retrieving value of environment variable
+   */
+  private static String envVar(String name) {
+    return Shell.WINDOWS ? '%' + name + '%' : '$' + name;
+  }
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/test/java/org/apache/hadoop/mapreduce/v2/util/TestMRApps.java Thu Mar  7 02:57:40 2013
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,6 +40,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
@@ -69,31 +72,35 @@ public class TestMRApps {
   }
   
   private static void delete(File dir) throws IOException {
-    Path p = new Path("file://"+dir.getAbsolutePath());
     Configuration conf = new Configuration();
-    FileSystem fs = p.getFileSystem(conf);
+    FileSystem fs = FileSystem.getLocal(conf);
+    Path p = fs.makeQualified(new Path(dir.getAbsolutePath()));
     fs.delete(p, true);
   }
 
-  @Test public void testJobIDtoString() {
+  @Test (timeout = 120000)
+  public void testJobIDtoString() {
     JobId jid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class);
     jid.setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
     assertEquals("job_0_0000", MRApps.toString(jid));
   }
 
-  @Test public void testToJobID() {
+  @Test (timeout = 120000)
+  public void testToJobID() {
     JobId jid = MRApps.toJobID("job_1_1");
     assertEquals(1, jid.getAppId().getClusterTimestamp());
     assertEquals(1, jid.getAppId().getId());
     assertEquals(1, jid.getId()); // tests against some proto.id and not a job.id field
   }
 
-  @Test(expected=IllegalArgumentException.class) public void testJobIDShort() {
+  @Test (timeout = 120000, expected=IllegalArgumentException.class)
+  public void testJobIDShort() {
     MRApps.toJobID("job_0_0_0");
   }
 
   //TODO_get.set
-  @Test public void testTaskIDtoString() {
+  @Test (timeout = 120000)
+  public void testTaskIDtoString() {
     TaskId tid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class);
     tid.setJobId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(JobId.class));
     tid.getJobId().setAppId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class));
@@ -108,7 +115,8 @@ public class TestMRApps {
     assertEquals("task_0_0000_r_000000", MRApps.toString(tid));
   }
 
-  @Test public void testToTaskID() {
+  @Test (timeout = 120000)
+  public void testToTaskID() {
     TaskId tid = MRApps.toTaskID("task_1_2_r_3");
     assertEquals(1, tid.getJobId().getAppId().getClusterTimestamp());
     assertEquals(2, tid.getJobId().getAppId().getId());
@@ -120,16 +128,19 @@ public class TestMRApps {
     assertEquals(TaskType.MAP, tid.getTaskType());
   }
 
-  @Test(expected=IllegalArgumentException.class) public void testTaskIDShort() {
+  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
+  public void testTaskIDShort() {
     MRApps.toTaskID("task_0_0000_m");
   }
 
-  @Test(expected=IllegalArgumentException.class) public void testTaskIDBadType() {
+  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
+  public void testTaskIDBadType() {
     MRApps.toTaskID("task_0_0000_x_000000");
   }
 
   //TODO_get.set
-  @Test public void testTaskAttemptIDtoString() {
+  @Test (timeout = 120000)
+  public void testTaskAttemptIDtoString() {
     TaskAttemptId taid = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskAttemptId.class);
     taid.setTaskId(RecordFactoryProvider.getRecordFactory(null).newRecordInstance(TaskId.class));
     taid.getTaskId().setTaskType(TaskType.MAP);
@@ -138,7 +149,8 @@ public class TestMRApps {
     assertEquals("attempt_0_0000_m_000000_0", MRApps.toString(taid));
   }
 
-  @Test public void testToTaskAttemptID() {
+  @Test (timeout = 120000)
+  public void testToTaskAttemptID() {
     TaskAttemptId taid = MRApps.toTaskAttemptID("attempt_0_1_m_2_3");
     assertEquals(0, taid.getTaskId().getJobId().getAppId().getClusterTimestamp());
     assertEquals(1, taid.getTaskId().getJobId().getAppId().getId());
@@ -147,11 +159,13 @@ public class TestMRApps {
     assertEquals(3, taid.getId());
   }
 
-  @Test(expected=IllegalArgumentException.class) public void testTaskAttemptIDShort() {
+  @Test(timeout = 120000, expected=IllegalArgumentException.class) 
+  public void testTaskAttemptIDShort() {
     MRApps.toTaskAttemptID("attempt_0_0_0_m_0");
   }
 
-  @Test public void testGetJobFileWithUser() {
+  @Test (timeout = 120000)
+  public void testGetJobFileWithUser() {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, "/my/path/to/staging");
     String jobFile = MRApps.getJobFile(conf, "dummy-user", 
@@ -161,49 +175,57 @@ public class TestMRApps {
         "/my/path/to/staging/dummy-user/.staging/job_dummy-job_12345/job.xml", jobFile);
   }
 
-  @Test public void testSetClasspath() throws IOException {
+  @Test (timeout = 120000)
+  public void testSetClasspath() throws IOException {
     Job job = Job.getInstance();
     Map<String, String> environment = new HashMap<String, String>();
     MRApps.setClasspath(environment, job.getConfiguration());
-    assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
+    assertTrue(environment.get("CLASSPATH").startsWith(
+      ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
     String yarnAppClasspath = 
         job.getConfiguration().get(
             YarnConfiguration.YARN_APPLICATION_CLASSPATH);
     if (yarnAppClasspath != null) {
-      yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", ":").trim();
+      yarnAppClasspath = yarnAppClasspath.replaceAll(",\\s*", File.pathSeparator)
+        .trim();
     }
     assertTrue(environment.get("CLASSPATH").contains(yarnAppClasspath));
     String mrAppClasspath = 
         job.getConfiguration().get(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH);
     if (mrAppClasspath != null) {
-      mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", ":").trim();
+      mrAppClasspath = mrAppClasspath.replaceAll(",\\s*", File.pathSeparator)
+        .trim();
     }
     assertTrue(environment.get("CLASSPATH").contains(mrAppClasspath));
   }
   
-  @Test public void testSetClasspathWithArchives () throws IOException {
+  @Test (timeout = 120000)
+  public void testSetClasspathWithArchives () throws IOException {
     File testTGZ = new File(testWorkDir, "test.tgz");
     FileOutputStream out = new FileOutputStream(testTGZ);
     out.write(0);
     out.close();
     Job job = Job.getInstance();
     Configuration conf = job.getConfiguration();
-    conf.set(MRJobConfig.CLASSPATH_ARCHIVES, "file://" 
-        + testTGZ.getAbsolutePath());
-    conf.set(MRJobConfig.CACHE_ARCHIVES, "file://"
-        + testTGZ.getAbsolutePath() + "#testTGZ");
+    String testTGZQualifiedPath = FileSystem.getLocal(conf).makeQualified(new Path(
+      testTGZ.getAbsolutePath())).toString();
+    conf.set(MRJobConfig.CLASSPATH_ARCHIVES, testTGZQualifiedPath);
+    conf.set(MRJobConfig.CACHE_ARCHIVES, testTGZQualifiedPath + "#testTGZ");
     Map<String, String> environment = new HashMap<String, String>();
     MRApps.setClasspath(environment, conf);
-    assertTrue(environment.get("CLASSPATH").startsWith("$PWD:"));
+    assertTrue(environment.get("CLASSPATH").startsWith(
+      ApplicationConstants.Environment.PWD.$() + File.pathSeparator));
     String confClasspath = job.getConfiguration().get(YarnConfiguration.YARN_APPLICATION_CLASSPATH);
     if (confClasspath != null) {
-      confClasspath = confClasspath.replaceAll(",\\s*", ":").trim();
+      confClasspath = confClasspath.replaceAll(",\\s*", File.pathSeparator)
+        .trim();
     }
     assertTrue(environment.get("CLASSPATH").contains(confClasspath));
     assertTrue(environment.get("CLASSPATH").contains("testTGZ"));
   }
 
- @Test public void testSetClasspathWithUserPrecendence() {
+ @Test (timeout = 120000)
+ public void testSetClasspathWithUserPrecendence() {
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, true);
     Map<String, String> env = new HashMap<String, String>();
@@ -213,11 +235,16 @@ public class TestMRApps {
       fail("Got exception while setting classpath");
     }
     String env_str = env.get("CLASSPATH");
-    assertSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
-      env_str.indexOf("$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*"), 0);
+    String expectedClasspath = StringUtils.join(File.pathSeparator,
+      Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
+        "job.jar/classes/", "job.jar/lib/*",
+        ApplicationConstants.Environment.PWD.$() + "/*"));
+    assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST set, but not taking effect!",
+      env_str.startsWith(expectedClasspath));
   }
 
-  @Test public void testSetClasspathWithNoUserPrecendence() {
+  @Test (timeout = 120000)
+  public void testSetClasspathWithNoUserPrecendence() {
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST, false);
     Map<String, String> env = new HashMap<String, String>();
@@ -227,31 +254,36 @@ public class TestMRApps {
       fail("Got exception while setting classpath");
     }
     String env_str = env.get("CLASSPATH");
-    int index = 
-         env_str.indexOf("job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*");
-    assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not"
-            + " in the classpath!", index, -1);
-    assertNotSame("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
-      index, 0);
+    String expectedClasspath = StringUtils.join(File.pathSeparator,
+      Arrays.asList("job.jar/job.jar", "job.jar/classes/", "job.jar/lib/*",
+        ApplicationConstants.Environment.PWD.$() + "/*"));
+    assertTrue("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, and job.jar is not in"
+      + " the classpath!", env_str.contains(expectedClasspath));
+    assertFalse("MAPREDUCE_JOB_USER_CLASSPATH_FIRST false, but taking effect!",
+      env_str.startsWith(expectedClasspath));
   }
   
-  @Test public void testSetClasspathWithJobClassloader() throws IOException {
+  @Test (timeout = 120000)
+  public void testSetClasspathWithJobClassloader() throws IOException {
     Configuration conf = new Configuration();
     conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_CLASSLOADER, true);
     Map<String, String> env = new HashMap<String, String>();
     MRApps.setClasspath(env, conf);
     String cp = env.get("CLASSPATH");
     String appCp = env.get("APP_CLASSPATH");
-    assertSame("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is"
-        + " in the classpath!", cp.indexOf("jar:job"), -1);
-    assertSame("MAPREDUCE_JOB_CLASSLOADER true, but PWD is"
-        + " in the classpath!", cp.indexOf("PWD"), -1);
-    assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not"
-        + " in the app classpath!",
-        "$PWD:job.jar/job.jar:job.jar/classes/:job.jar/lib/*:$PWD/*", appCp);
+    assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is in the"
+      + " classpath!", cp.contains("jar" + File.pathSeparator + "job"));
+    assertFalse("MAPREDUCE_JOB_CLASSLOADER true, but PWD is in the classpath!",
+      cp.contains("PWD"));
+    String expectedAppClasspath = StringUtils.join(File.pathSeparator,
+      Arrays.asList(ApplicationConstants.Environment.PWD.$(), "job.jar/job.jar",
+        "job.jar/classes/", "job.jar/lib/*",
+        ApplicationConstants.Environment.PWD.$() + "/*"));
+    assertEquals("MAPREDUCE_JOB_CLASSLOADER true, but job.jar is not in the app"
+      + " classpath!", expectedAppClasspath, appCp);
   }
   
-  @Test
+  @Test (timeout = 30000)
   public void testSetupDistributedCacheEmpty() throws IOException {
     Configuration conf = new Configuration();
     Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
@@ -261,7 +293,7 @@ public class TestMRApps {
   }
   
   @SuppressWarnings("deprecation")
-  @Test(expected = InvalidJobConfException.class)
+  @Test(timeout = 120000, expected = InvalidJobConfException.class)
   public void testSetupDistributedCacheConflicts() throws Exception {
     Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
@@ -292,7 +324,7 @@ public class TestMRApps {
   }
   
   @SuppressWarnings("deprecation")
-  @Test(expected = InvalidJobConfException.class)
+  @Test(timeout = 120000, expected = InvalidJobConfException.class)
   public void testSetupDistributedCacheConflictsFiles() throws Exception {
     Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
@@ -320,7 +352,7 @@ public class TestMRApps {
   }
   
   @SuppressWarnings("deprecation")
-  @Test
+  @Test (timeout = 30000)
   public void testSetupDistributedCache() throws Exception {
     Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Thu Mar  7 02:57:40 2013
@@ -212,6 +212,7 @@
           {"name": "rackname", "type": "string"},
           {"name": "status", "type": "string"},
           {"name": "error", "type": "string"},
+          {"name": "counters", "type": "JhCounters"},
           {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
           {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
           {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
@@ -226,7 +227,8 @@
           {"name": "finishTime", "type": "long"},
           {"name": "error", "type": "string"},
           {"name": "failedDueToAttempt", "type": ["null", "string"] },
-          {"name": "status", "type": "string"}
+          {"name": "status", "type": "string"},
+          {"name": "counters", "type": "JhCounters"}
       ]
      },
 

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Thu Mar  7 02:57:40 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
@@ -453,7 +454,7 @@ public class JobConf extends Configurati
    * @param cls the example class.
    */
   public void setJarByClass(Class cls) {
-    String jar = findContainingJar(cls);
+    String jar = ClassUtil.findContainingJar(cls);
     if (jar != null) {
       setJar(jar);
     }   
@@ -1811,7 +1812,7 @@ public class JobConf extends Configurati
     return 
     (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
   }
-  
+
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing
@@ -1822,35 +1823,9 @@ public class JobConf extends Configurati
    * @throws IOException
    */
   public static String findContainingJar(Class my_class) {
-    ClassLoader loader = my_class.getClassLoader();
-    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-    try {
-      for(Enumeration itr = loader.getResources(class_file);
-          itr.hasMoreElements();) {
-        URL url = (URL) itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          // URLDecoder is a misnamed class, since it actually decodes
-          // x-www-form-urlencoded MIME type rather than actual
-          // URL encoding (which the file path has). Therefore it would
-          // decode +s to ' 's which is incorrect (spaces are actually
-          // either unencoded or encoded as "%20"). Replace +s first, so
-          // that they are kept sacred during the decoding process.
-          toReturn = toReturn.replaceAll("\\+", "%2B");
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return null;
+    return ClassUtil.findContainingJar(my_class);
   }
 
-
   /**
    * Get the memory required to run a task of this job, in bytes. See
    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Merger.java Thu Mar  7 02:57:40 2013
@@ -169,7 +169,7 @@ public class Merger {  
   }
 
 
-  static <K extends Object, V extends Object>
+  public static <K extends Object, V extends Object>
   RawKeyValueIterator merge(Configuration conf, FileSystem fs,
                           Class<K> keyClass, Class<V> valueClass,
                           CompressionCodec codec,

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Mar  7 02:57:40 2013
@@ -63,6 +63,9 @@ public interface MRJobConfig {
 
   public static final String SPLIT_FILE = "mapreduce.job.splitfile";
 
+  public static final String SPLIT_METAINFO_MAXSIZE = "mapreduce.job.split.metainfo.maxsize";
+  public static final long DEFAULT_SPLIT_METAINFO_MAXSIZE = 10000000L;
+
   public static final String NUM_MAPS = "mapreduce.job.maps";
 
   public static final String MAX_TASK_FAILURES_PER_TRACKER = "mapreduce.job.maxtaskfailures.per.tracker";

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Thu Mar  7 02:57:40 2013
@@ -295,6 +295,7 @@ public class JobHistoryParser implements
     attemptInfo.shuffleFinishTime = event.getFinishTime();
     attemptInfo.sortFinishTime = event.getFinishTime();
     attemptInfo.mapFinishTime = event.getFinishTime();
+    attemptInfo.counters = event.getCounters();
     if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
     {
       //this is a successful task
@@ -347,6 +348,7 @@ public class JobHistoryParser implements
     taskInfo.finishTime = event.getFinishTime();
     taskInfo.error = StringInterner.weakIntern(event.getError());
     taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
+    taskInfo.counters = event.getCounters();
     info.errorInfo = "Task " + taskInfo.taskId +" failed " +
     taskInfo.attemptsMap.size() + " times ";
   }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Thu Mar  7 02:57:40 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.jobh
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -36,8 +37,24 @@ import org.apache.avro.util.Utf8;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
-  private TaskAttemptUnsuccessfulCompletion datum =
-    new TaskAttemptUnsuccessfulCompletion();
+
+  private TaskAttemptUnsuccessfulCompletion datum = null;
+
+  private TaskAttemptID attemptId;
+  private TaskType taskType;
+  private String status;
+  private long finishTime;
+  private String hostname;
+  private int port;
+  private String rackName;
+  private String error;
+  private Counters counters;
+  int[][] allSplits;
+  int[] clockSplits;
+  int[] cpuUsages;
+  int[] vMemKbytes;
+  int[] physMemKbytes;
+  private static final Counters EMPTY_COUNTERS = new Counters();
 
   /** 
    * Create an event to record the unsuccessful completion of attempts
@@ -49,6 +66,7 @@ public class TaskAttemptUnsuccessfulComp
    * @param port rpc port for for the tracker
    * @param rackName Name of the rack where the attempt executed
    * @param error Error string
+   * @param counters Counters for the attempt
    * @param allSplits the "splits", or a pixelated graph of various
    *        measurable worker node state variables against progress.
    *        Currently there are four; wallclock time, CPU time,
@@ -58,31 +76,25 @@ public class TaskAttemptUnsuccessfulComp
        (TaskAttemptID id, TaskType taskType,
         String status, long finishTime,
         String hostname, int port, String rackName,
-        String error, int[][] allSplits) {
-    datum.taskid = new Utf8(id.getTaskID().toString());
-    datum.taskType = new Utf8(taskType.name());
-    datum.attemptId = new Utf8(id.toString());
-    datum.finishTime = finishTime;
-    datum.hostname = new Utf8(hostname);
-    if (rackName != null) {
-      datum.rackname = new Utf8(rackName);
-    }
-    datum.port = port;
-    datum.error = new Utf8(error);
-    datum.status = new Utf8(status);
-
-    datum.clockSplits 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
-    datum.cpuUsages 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
-    datum.vMemKbytes 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
-    datum.physMemKbytes 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+        String error, Counters counters, int[][] allSplits) {
+    this.attemptId = id;
+    this.taskType = taskType;
+    this.status = status;
+    this.finishTime = finishTime;
+    this.hostname = hostname;
+    this.port = port;
+    this.rackName = rackName;
+    this.error = error;
+    this.counters = counters;
+    this.allSplits = allSplits;
+    this.clockSplits =
+        ProgressSplitsBlock.arrayGetWallclockTime(allSplits);
+    this.cpuUsages =
+        ProgressSplitsBlock.arrayGetCPUTime(allSplits);
+    this.vMemKbytes =
+        ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
+    this.physMemKbytes =
+        ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
   }
 
   /** 
@@ -103,42 +115,109 @@ public class TaskAttemptUnsuccessfulComp
        (TaskAttemptID id, TaskType taskType,
         String status, long finishTime, 
         String hostname, String error) {
-    this(id, taskType, status, finishTime, hostname, -1, "", error, null);
+    this(id, taskType, status, finishTime, hostname, -1, "",
+        error, EMPTY_COUNTERS, null);
+  }
+  
+  public TaskAttemptUnsuccessfulCompletionEvent
+      (TaskAttemptID id, TaskType taskType,
+       String status, long finishTime,
+       String hostname, int port, String rackName,
+       String error, int[][] allSplits) {
+    this(id, taskType, status, finishTime, hostname, port,
+        rackName, error, EMPTY_COUNTERS, null);
   }
 
   TaskAttemptUnsuccessfulCompletionEvent() {}
 
-  public Object getDatum() { return datum; }
-  public void setDatum(Object datum) {
-    this.datum = (TaskAttemptUnsuccessfulCompletion)datum;
+  public Object getDatum() {
+    if(datum == null) {
+      datum = new TaskAttemptUnsuccessfulCompletion();
+      datum.taskid = new Utf8(attemptId.getTaskID().toString());
+      datum.taskType = new Utf8(taskType.name());
+      datum.attemptId = new Utf8(attemptId.toString());
+      datum.finishTime = finishTime;
+      datum.hostname = new Utf8(hostname);
+      if (rackName != null) {
+        datum.rackname = new Utf8(rackName);
+      }
+      datum.port = port;
+      datum.error = new Utf8(error);
+      datum.status = new Utf8(status);
+
+      datum.counters = EventWriter.toAvro(counters);
+
+      datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetWallclockTime(allSplits));
+      datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetCPUTime(allSplits));
+      datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetVMemKbytes(allSplits));
+      datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetPhysMemKbytes(allSplits));
+    }
+    return datum;
+  }
+  
+  
+  
+  public void setDatum(Object odatum) {
+    this.datum =
+        (TaskAttemptUnsuccessfulCompletion)odatum;
+    this.attemptId =
+        TaskAttemptID.forName(datum.attemptId.toString());
+    this.taskType =
+        TaskType.valueOf(datum.taskType.toString());
+    this.finishTime = datum.finishTime;
+    this.hostname = datum.hostname.toString();
+    this.rackName = datum.rackname.toString();
+    this.port = datum.port;
+    this.status = datum.status.toString();
+    this.error = datum.error.toString();
+    this.counters =
+        EventReader.fromAvro(datum.counters);
+    this.clockSplits =
+        AvroArrayUtils.fromAvro(datum.clockSplits);
+    this.cpuUsages =
+        AvroArrayUtils.fromAvro(datum.cpuUsages);
+    this.vMemKbytes =
+        AvroArrayUtils.fromAvro(datum.vMemKbytes);
+    this.physMemKbytes =
+        AvroArrayUtils.fromAvro(datum.physMemKbytes);
   }
 
   /** Get the task id */
-  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
+  public TaskID getTaskId() {
+    return attemptId.getTaskID();
+  }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(datum.taskType.toString());
+    return TaskType.valueOf(taskType.toString());
   }
   /** Get the attempt id */
   public TaskAttemptID getTaskAttemptId() {
-    return TaskAttemptID.forName(datum.attemptId.toString());
+    return attemptId;
   }
   /** Get the finish time */
-  public long getFinishTime() { return datum.finishTime; }
+  public long getFinishTime() { return finishTime; }
   /** Get the name of the host where the attempt executed */
-  public String getHostname() { return datum.hostname.toString(); }
+  public String getHostname() { return hostname; }
   /** Get the rpc port for the host where the attempt executed */
-  public int getPort() { return datum.port; }
+  public int getPort() { return port; }
   
   /** Get the rack name of the node where the attempt ran */
   public String getRackName() {
-    return datum.rackname == null ? null : datum.rackname.toString();
+    return rackName == null ? null : rackName.toString();
   }
   
   /** Get the error string */
-  public String getError() { return datum.error.toString(); }
+  public String getError() { return error.toString(); }
   /** Get the task status */
-  public String getTaskStatus() { return datum.status.toString(); }
+  public String getTaskStatus() {
+    return status.toString();
+  }
+  /** Get the counters */
+  Counters getCounters() { return counters; }
   /** Get the event type */
   public EventType getEventType() {
     // Note that the task type can be setup/map/reduce/cleanup but the 
@@ -157,16 +236,16 @@ public class TaskAttemptUnsuccessfulComp
 
 
   public int[] getClockSplits() {
-    return AvroArrayUtils.fromAvro(datum.clockSplits);
+    return clockSplits;
   }
   public int[] getCpuUsages() {
-    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+    return cpuUsages;
   }
   public int[] getVMemKbytes() {
-    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+    return vMemKbytes;
   }
   public int[] getPhysMemKbytes() {
-    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+    return physMemKbytes;
   }
 
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java Thu Mar  7 02:57:40 2013
@@ -18,10 +18,9 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -35,7 +34,17 @@ import org.apache.avro.util.Utf8;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TaskFailedEvent implements HistoryEvent {
-  private TaskFailed datum = new TaskFailed();
+  private TaskFailed datum = null;
+
+  private TaskAttemptID failedDueToAttempt;
+  private TaskID id;
+  private TaskType taskType;
+  private long finishTime;
+  private String status;
+  private String error;
+  private Counters counters;
+
+  private static final Counters EMPTY_COUNTERS = new Counters();
 
   /**
    * Create an event to record task failure
@@ -45,45 +54,87 @@ public class TaskFailedEvent implements 
    * @param error Error String
    * @param status Status
    * @param failedDueToAttempt The attempt id due to which the task failed
+   * @param counters Counters for the task
    */
   public TaskFailedEvent(TaskID id, long finishTime, 
       TaskType taskType, String error, String status,
-      TaskAttemptID failedDueToAttempt) {
-    datum.taskid = new Utf8(id.toString());
-    datum.error = new Utf8(error);
-    datum.finishTime = finishTime;
-    datum.taskType = new Utf8(taskType.name());
-    datum.failedDueToAttempt = failedDueToAttempt == null
-      ? null
-      : new Utf8(failedDueToAttempt.toString());
-    datum.status = new Utf8(status);
+      TaskAttemptID failedDueToAttempt, Counters counters) {
+    this.id = id;
+    this.finishTime = finishTime;
+    this.taskType = taskType;
+    this.error = error;
+    this.status = status;
+    this.failedDueToAttempt = failedDueToAttempt;
+    this.counters = counters;
   }
 
+  public TaskFailedEvent(TaskID id, long finishTime, 
+	      TaskType taskType, String error, String status,
+	      TaskAttemptID failedDueToAttempt) {
+    this(id, finishTime, taskType, error, status,
+        failedDueToAttempt, EMPTY_COUNTERS);
+  }
+  
   TaskFailedEvent() {}
 
-  public Object getDatum() { return datum; }
-  public void setDatum(Object datum) { this.datum = (TaskFailed)datum; }
+  public Object getDatum() {
+    if(datum == null) {
+      datum = new TaskFailed();
+      datum.taskid = new Utf8(id.toString());
+      datum.error = new Utf8(error);
+      datum.finishTime = finishTime;
+      datum.taskType = new Utf8(taskType.name());
+      datum.failedDueToAttempt =
+          failedDueToAttempt == null
+          ? null
+          : new Utf8(failedDueToAttempt.toString());
+      datum.status = new Utf8(status);
+      datum.counters = EventWriter.toAvro(counters);
+    }
+    return datum;
+  }
+  
+  public void setDatum(Object odatum) {
+    this.datum = (TaskFailed)odatum;
+    this.id =
+        TaskID.forName(datum.taskid.toString());
+    this.taskType =
+        TaskType.valueOf(datum.taskType.toString());
+    this.finishTime = datum.finishTime;
+    this.error = datum.error.toString();
+    this.failedDueToAttempt =
+        datum.failedDueToAttempt == null
+        ? null
+        : TaskAttemptID.forName(
+            datum.failedDueToAttempt.toString());
+    this.status = datum.status.toString();
+    this.counters =
+        EventReader.fromAvro(datum.counters);
+  }
 
   /** Get the task id */
-  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
+  public TaskID getTaskId() { return id; }
   /** Get the error string */
-  public String getError() { return datum.error.toString(); }
+  public String getError() { return error; }
   /** Get the finish time of the attempt */
-  public long getFinishTime() { return datum.finishTime; }
+  public long getFinishTime() {
+    return finishTime;
+  }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(datum.taskType.toString());
+    return taskType;
   }
   /** Get the attempt id due to which the task failed */
   public TaskAttemptID getFailedAttemptID() {
-    return datum.failedDueToAttempt == null
-      ? null
-      : TaskAttemptID.forName(datum.failedDueToAttempt.toString());
+    return failedDueToAttempt;
   }
   /** Get the task status */
-  public String getTaskStatus() { return datum.status.toString(); }
+  public String getTaskStatus() { return status; }
+  /** Get task counters */
+  public Counters getCounters() { return counters; }
   /** Get the event type */
-  public EventType getEventType() { return EventType.TASK_FAILED; }
+  public EventType getEventType() {
+    return EventType.TASK_FAILED;
+  }
 
-  
 }

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Thu Mar  7 02:57:40 2013
@@ -49,6 +49,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
  * {@link InputFormat#getSplits(JobContext)} method. 
@@ -76,7 +78,7 @@ import org.apache.hadoop.net.NetworkTopo
 @InterfaceStability.Stable
 public abstract class CombineFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
-
+  
   public static final String SPLIT_MINSIZE_PERNODE = 
     "mapreduce.input.fileinputformat.split.minsize.per.node";
   public static final String SPLIT_MINSIZE_PERRACK = 
@@ -163,7 +165,6 @@ public abstract class CombineFileInputFo
   @Override
   public List<InputSplit> getSplits(JobContext job) 
     throws IOException {
-
     long minSizeNode = 0;
     long minSizeRack = 0;
     long maxSize = 0;
@@ -286,56 +287,100 @@ public abstract class CombineFileInputFo
                                  rackToNodes, maxSize);
       totLength += files[i].getLength();
     }
+    createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
+                 maxSize, minSizeNode, minSizeRack, splits);
+  }
 
+  @VisibleForTesting
+  void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                     HashMap<OneBlockInfo, String[]> blockToNodes,
+                     HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                     long totLength,
+                     long maxSize,
+                     long minSizeNode,
+                     long minSizeRack,
+                     List<InputSplit> splits                     
+                    ) {
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
     Set<String> nodes = new HashSet<String>();
     long curSplitSize = 0;
+    
+    int numNodes = nodeToBlocks.size();
+    long totalLength = totLength;
 
-    // process all nodes and create splits that are local
-    // to a node. 
-    for (Iterator<Map.Entry<String, 
-         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
-         iter.hasNext();) {
-
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from 
-      // blockToNodes so that the same block does not appear in 
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then 
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(splits, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
+    while(true) {
+      // it is allowed for maxSize to be 0. Disable smoothing load for such cases
+      int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
+                                        ((int) (totalLength/maxSize))/numNodes
+                                        : Integer.MAX_VALUE;
+      int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
+      numNodes = 0;
+
+      // process all nodes and create splits that are local to a node.
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+          .entrySet().iterator(); iter.hasNext();) {
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        nodes.add(one.getKey());
+        List<OneBlockInfo> blocksInNode = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        int splitsInNode = 0;
+        for (OneBlockInfo oneblock : blocksInNode) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(splits, nodes, validBlocks);
+              totalLength -= curSplitSize;
+              curSplitSize = 0;
+              validBlocks.clear();
+              splitsInNode++;
+              if (splitsInNode == maxSplitsByNodeOnly) {
+                // stop grouping on a node so as not to create
+                // disproportionately more splits on a node because it happens
+                // to have many blocks
+                // consider only these nodes in next round of grouping because
+                // they have leftover blocks that may need to be grouped
+                numNodes++;
+                break;
+              }
+            }
           }
         }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely 
-      // that they will be combined with other blocks from the 
-      // same rack later on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(splits, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
+        // if there were any blocks left over and their combined size is
+        // larger than minSplitNode, then combine them into one split.
+        // Otherwise add them back to the unprocessed pool. It is likely
+        // that they will be combined with other blocks from the
+        // same rack later on.
+        if (minSizeNode != 0 && curSplitSize >= minSizeNode
+            && splitsInNode == 0) {
+          // haven't created any split on this machine. so its ok to add a
+          // smaller
+          // one for parallelism. Otherwise group it in the rack for balanced
+          // size
+          // create an input split and add it to the splits array
+          addCreatedSplit(splits, nodes, validBlocks);
+          totalLength -= curSplitSize;
+        } else {
+          for (OneBlockInfo oneblock : validBlocks) {
+            blockToNodes.put(oneblock, oneblock.hosts);
+          }
         }
+        validBlocks.clear();
+        nodes.clear();
+        curSplitSize = 0;
+      }
+      
+      if(!(numNodes>0 && totalLength>0)) {
+        break;
       }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
     }
 
     // if blocks in a rack are below the specified minimum size, then keep them
@@ -458,7 +503,6 @@ public abstract class CombineFileInputFo
       offset[i] = validBlocks.get(i).offset;
       length[i] = validBlocks.get(i).length;
     }
-
      // add this split to the list that is returned
     CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
                                    length, locations.toArray(new String[0]));
@@ -474,7 +518,8 @@ public abstract class CombineFileInputFo
   /**
    * information about one file from the File System
    */
-  private static class OneFileInfo {
+  @VisibleForTesting
+  static class OneFileInfo {
     private long fileSize;               // size of the file
     private OneBlockInfo[] blocks;       // all blocks in this file
 
@@ -545,45 +590,55 @@ public abstract class CombineFileInputFo
           }
           blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
         }
+        
+        populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
+                          nodeToBlocks, rackToNodes);
+      }
+    }
+    
+    @VisibleForTesting
+    static void populateBlockInfo(OneBlockInfo[] blocks,
+                          HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                          HashMap<OneBlockInfo, String[]> blockToNodes,
+                          HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                          HashMap<String, Set<String>> rackToNodes) {
+      for (OneBlockInfo oneblock : blocks) {
+        // add this block to the block --> node locations map
+        blockToNodes.put(oneblock, oneblock.hosts);
+
+        // For blocks that do not have host/rack information,
+        // assign to default  rack.
+        String[] racks = null;
+        if (oneblock.hosts.length == 0) {
+          racks = new String[]{NetworkTopology.DEFAULT_RACK};
+        } else {
+          racks = oneblock.racks;
+        }
 
-        for (OneBlockInfo oneblock : blocks) {
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
-
-          // For blocks that do not have host/rack information,
-          // assign to default  rack.
-          String[] racks = null;
-          if (oneblock.hosts.length == 0) {
-            racks = new String[]{NetworkTopology.DEFAULT_RACK};
-          } else {
-            racks = oneblock.racks;
+        // add this block to the rack --> block map
+        for (int j = 0; j < racks.length; j++) {
+          String rack = racks[j];
+          List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+          if (blklist == null) {
+            blklist = new ArrayList<OneBlockInfo>();
+            rackToBlocks.put(rack, blklist);
           }
-
-          // add this block to the rack --> block map
-          for (int j = 0; j < racks.length; j++) {
-            String rack = racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
-              // Add this host to rackToNodes map
-              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
-            }
+          blklist.add(oneblock);
+          if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+            // Add this host to rackToNodes map
+            addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
           }
+        }
 
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
+        // add this block to the node --> block map
+        for (int j = 0; j < oneblock.hosts.length; j++) {
+          String node = oneblock.hosts[j];
+          List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+          if (blklist == null) {
+            blklist = new ArrayList<OneBlockInfo>();
+            nodeToBlocks.put(node, blklist);
           }
+          blklist.add(oneblock);
         }
       }
     }
@@ -600,7 +655,8 @@ public abstract class CombineFileInputFo
   /**
    * information about one block from the File System
    */
-  private static class OneBlockInfo {
+  @VisibleForTesting
+  static class OneBlockInfo {
     Path onepath;                // name of this file
     long offset;                 // offset in file
     long length;                 // length of this block

Modified: hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=1453669&r1=1453668&r2=1453669&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java (original)
+++ hadoop/common/branches/HDFS-2802/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Thu Mar  7 02:57:40 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.spli
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -29,9 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 
 /**
  * A utility that reads the split meta info and creates
@@ -44,8 +44,8 @@ public class SplitMetaInfoReader {
   public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
       JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) 
   throws IOException {
-    long maxMetaInfoSize = conf.getLong(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE, 
-        10000000L);
+    long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
+        MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
     Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
     String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
     FileStatus fStatus = fs.getFileStatus(metaSplitFile);



Mime
View raw message