hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1451695 - in /hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-ma...
Date Fri, 01 Mar 2013 19:37:09 GMT
Author: suresh
Date: Fri Mar  1 19:37:03 2013
New Revision: 1451695

URL: http://svn.apache.org/r1451695
Log:
Merge trunk to branch-trunk-win

Modified:
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/bin/mapred
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
    hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/pom.xml

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1448457-1451693

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt Fri Mar  1 19:37:03 2013
@@ -163,6 +163,12 @@ Release 2.0.4-beta - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-5033. mapred shell script should respect usage flags
+    (--help -help -h). (Andrew Wang via atm)
+
+    MAPREDUCE-4892. Modify CombineFileInputFormat to not skew input slits'
+    allocation on small clusters. (Bikas Saha via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -177,6 +183,24 @@ Release 2.0.4-beta - UNRELEASED
 
     MAPREDUCE-4994. Addendum fixing testcases failures. (sandyr via tucu)
 
+    MAPREDUCE-4846. Some JobQueueInfo methods are public in MR1 but protected
+    in MR2. (Sandy Ryza via tomwhite)
+
+    MAPREDUCE-5013. mapred.JobStatus compatibility: MR2 missing constructors
+    from MR1. (Sandy Ryza via tomwhite)
+
+    MAPREDUCE-4951. Container preemption interpreted as task failure.
+    (Sandy Ryza via tomwhite)
+
+    MAPREDUCE-5008. Merger progress miscounts with respect to EOF_MARKER.
+    (Sandy Ryza via tomwhite)
+
+    MAPREDUCE-4693. History server should include counters for failed tasks.
+    (Xuan Gong via sseth)
+
+    MAPREDUCE-4896. mapred queue -info spits out ugly exception when queue does 
+    not exist. (sandyr via tucu)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES
@@ -726,6 +750,9 @@ Release 0.23.7 - UNRELEASED
     MAPREDUCE-5009. Killing the Task Attempt slated for commit does not clear
     the value from the Task commitAttempt member (Robert Parker via jeagles)
 
+    MAPREDUCE-4871. AM uses mapreduce.jobtracker.split.metainfo.maxsize but
+    mapred-default has mapreduce.job.split.metainfo.maxsize (Jason Lowe via
+    jeagles)
 
 Release 0.23.6 - UNRELEASED
 

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1448457-1451693

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/bin/mapred
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/bin/mapred?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/bin/mapred (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/bin/mapred Fri Mar  1 19:37:03 2013
@@ -50,6 +50,14 @@ fi
 COMMAND=$1
 shift
 
+case $COMMAND in
+  # usage flags
+  --help|-help|-h)
+    print_usage
+    exit
+    ;;
+esac
+
 if [ "$COMMAND" = "job" ] ; then
   CLASS=org.apache.hadoop.mapred.JobClient
 elif [ "$COMMAND" = "queue" ] ; then

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1448457-1451693

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Fri Mar  1 19:37:03 2013
@@ -238,7 +238,6 @@ public abstract class TaskAttemptImpl im
          TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_CONTAINER_COMPLETED,
          CLEANUP_CONTAINER_TRANSITION)
-      // ^ If RM kills the container due to expiry, preemption etc. 
      .addTransition(TaskAttemptStateInternal.ASSIGNED, 
          TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
          TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
@@ -1184,7 +1183,8 @@ public abstract class TaskAttemptImpl im
             taskAttempt.nodeRackName == null ? "UNKNOWN" 
                 : taskAttempt.nodeRackName,
             StringUtils.join(
-                LINE_SEPARATOR, taskAttempt.getDiagnostics()), taskAttempt
+                LINE_SEPARATOR, taskAttempt.getDiagnostics()),
+                taskAttempt.getCounters(), taskAttempt
                 .getProgressSplitBlock().burst());
     return tauce;
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Fri Mar  1 19:37:03 2013
@@ -730,7 +730,8 @@ public abstract class TaskImpl implement
         TypeConverter.fromYarn(task.getType()),
         errorSb.toString(),
         taskState.toString(),
-        taId == null ? null : TypeConverter.fromYarn(taId));
+        taId == null ? null : TypeConverter.fromYarn(taId),
+        task.getCounters());
     return taskFailedEvent;
   }
   

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Mar  1 19:37:03 2013
@@ -67,9 +67,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.util.RackResolver;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Allocates the container from the ResourceManager scheduler.
  */
@@ -606,8 +609,8 @@ public class RMContainerAllocator extend
         assignedRequests.remove(attemptID);
         
         // send the container completed event to Task attempt
-        eventHandler.handle(new TaskAttemptEvent(attemptID,
-            TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+        eventHandler.handle(createContainerFinishedEvent(cont, attemptID));
+        
         // Send the diagnostics
         String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
         eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
@@ -617,6 +620,19 @@ public class RMContainerAllocator extend
     return newContainers;
   }
   
+  @VisibleForTesting
+  public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont,
+      TaskAttemptId attemptID) {
+    if (cont.getExitStatus() == YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS) {
+      // killed by framework
+      return new TaskAttemptEvent(attemptID,
+          TaskAttemptEventType.TA_KILL);
+    } else {
+      return new TaskAttemptEvent(attemptID,
+          TaskAttemptEventType.TA_CONTAINER_COMPLETED);
+    }
+  }
+  
   @SuppressWarnings("unchecked")
   private void handleUpdatedNodes(AMResponse response) {
     // send event to the job about on updated nodes

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Mar  1 19:37:03 2013
@@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.Event;
@@ -1645,6 +1646,32 @@ public class TestRMContainerAllocator {
     Assert.assertTrue(callbackCalled.get());
   }
 
+  @Test
+  public void testCompletedContainerEvent() {
+    RMContainerAllocator allocator = new RMContainerAllocator(
+        mock(ClientService.class), mock(AppContext.class));
+    
+    TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(
+        MRBuilderUtils.newTaskId(
+            MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1);
+    ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1);
+    ContainerStatus status = BuilderUtils.newContainerStatus(
+        containerId, ContainerState.RUNNING, "", 0);
+
+    ContainerStatus abortedStatus = BuilderUtils.newContainerStatus(
+        containerId, ContainerState.RUNNING, "",
+        YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS);
+    
+    TaskAttemptEvent event = allocator.createContainerFinishedEvent(status,
+        attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+        event.getType());
+    
+    TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent(
+        abortedStatus, attemptId);
+    Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
+  }
+  
   public static void main(String[] args) throws Exception {
     TestRMContainerAllocator t = new TestRMContainerAllocator();
     t.testSimple();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr Fri Mar  1 19:37:03 2013
@@ -212,6 +212,7 @@
           {"name": "rackname", "type": "string"},
           {"name": "status", "type": "string"},
           {"name": "error", "type": "string"},
+          {"name": "counters", "type": "JhCounters"},
           {"name": "clockSplits", "type": { "type": "array", "items": "int"}},
           {"name": "cpuUsages", "type": { "type": "array", "items": "int"}},
           {"name": "vMemKbytes", "type": { "type": "array", "items": "int"}},
@@ -226,7 +227,8 @@
           {"name": "finishTime", "type": "long"},
           {"name": "error", "type": "string"},
           {"name": "failedDueToAttempt", "type": ["null", "string"] },
-          {"name": "status", "type": "string"}
+          {"name": "status", "type": "string"},
+          {"name": "counters", "type": "JhCounters"}
       ]
      },
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Fri Mar  1 19:37:03 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.mapreduce.MRJob
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.util.ConfigUtil;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.util.ClassUtil;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.log4j.Level;
@@ -453,7 +454,7 @@ public class JobConf extends Configurati
    * @param cls the example class.
    */
   public void setJarByClass(Class cls) {
-    String jar = findContainingJar(cls);
+    String jar = ClassUtil.findContainingJar(cls);
     if (jar != null) {
       setJar(jar);
     }   
@@ -1811,7 +1812,7 @@ public class JobConf extends Configurati
     return 
     (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
   }
-  
+
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing
@@ -1822,35 +1823,9 @@ public class JobConf extends Configurati
    * @throws IOException
    */
   public static String findContainingJar(Class my_class) {
-    ClassLoader loader = my_class.getClassLoader();
-    String class_file = my_class.getName().replaceAll("\\.", "/") + ".class";
-    try {
-      for(Enumeration itr = loader.getResources(class_file);
-          itr.hasMoreElements();) {
-        URL url = (URL) itr.nextElement();
-        if ("jar".equals(url.getProtocol())) {
-          String toReturn = url.getPath();
-          if (toReturn.startsWith("file:")) {
-            toReturn = toReturn.substring("file:".length());
-          }
-          // URLDecoder is a misnamed class, since it actually decodes
-          // x-www-form-urlencoded MIME type rather than actual
-          // URL encoding (which the file path has). Therefore it would
-          // decode +s to ' 's which is incorrect (spaces are actually
-          // either unencoded or encoded as "%20"). Replace +s first, so
-          // that they are kept sacred during the decoding process.
-          toReturn = toReturn.replaceAll("\\+", "%2B");
-          toReturn = URLDecoder.decode(toReturn, "UTF-8");
-          return toReturn.replaceAll("!.*$", "");
-        }
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-    return null;
+    return ClassUtil.findContainingJar(my_class);
   }
 
-
   /**
    * Get the memory required to run a task of this job, in bytes. See
    * {@link #MAPRED_TASK_MAXVMEM_PROPERTY}

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobQueueInfo.java Fri Mar  1 19:37:03 2013
@@ -67,7 +67,8 @@ public class JobQueueInfo extends QueueI
    * 
    * @param queueName Name of the job queue.
    */
-  protected void setQueueName(String queueName) {
+  @InterfaceAudience.Private
+  public void setQueueName(String queueName) {
     super.setQueueName(queueName);
   }
 
@@ -76,7 +77,8 @@ public class JobQueueInfo extends QueueI
    * 
    * @param schedulingInfo
    */
-  protected void setSchedulingInfo(String schedulingInfo) {
+  @InterfaceAudience.Private
+  public void setSchedulingInfo(String schedulingInfo) {
     super.setSchedulingInfo(schedulingInfo);
   }
 
@@ -84,15 +86,21 @@ public class JobQueueInfo extends QueueI
    * Set the state of the queue
    * @param state state of the queue.
    */
-  protected void setQueueState(String state) {
+  @InterfaceAudience.Private
+  public void setQueueState(String state) {
     super.setState(QueueState.getState(state));
   }
   
-  String getQueueState() {
+  /**
+   * Use getState() instead
+   */
+  @Deprecated
+  public String getQueueState() {
     return super.getState().toString();
   }
   
-  protected void setChildren(List<JobQueueInfo> children) {
+  @InterfaceAudience.Private
+  public void setChildren(List<JobQueueInfo> children) {
     List<QueueInfo> list = new ArrayList<QueueInfo>();
     for (JobQueueInfo q : children) {
       list.add(q);
@@ -108,7 +116,8 @@ public class JobQueueInfo extends QueueI
     return list;
   }
 
-  protected void setProperties(Properties props) {
+  @InterfaceAudience.Private
+  public void setProperties(Properties props) {
     super.setProperties(props);
   }
 
@@ -141,7 +150,8 @@ public class JobQueueInfo extends QueueI
     setChildren(children);
   }
 
-  protected void setJobStatuses(org.apache.hadoop.mapreduce.JobStatus[] stats) {
+  @InterfaceAudience.Private
+  public void setJobStatuses(org.apache.hadoop.mapreduce.JobStatus[] stats) {
     super.setJobStatuses(stats);
   }
 

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobStatus.java Fri Mar  1 19:37:03 2013
@@ -77,6 +77,59 @@ public class JobStatus extends org.apach
    */
   public JobStatus() {
   }
+  
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      float cleanupProgress, int runState) {
+    this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, null,
+        null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      int runState) {
+    this (jobid, mapProgress, reduceProgress, runState, null, null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float mapProgress, float reduceProgress,
+      float cleanupProgress, int runState, JobPriority jp) {
+    this(jobid, mapProgress, reduceProgress, cleanupProgress, runState, jp,
+        null, null, null, null);
+  }
+
+  /**
+   * Create a job status object for a given jobid.
+   * @param jobid The jobid of the job
+   * @param setupProgress The progress made on the setup
+   * @param mapProgress The progress made on the maps
+   * @param reduceProgress The progress made on the reduces
+   * @param cleanupProgress The progress made on the cleanup
+   * @param runState The current state of the job
+   * @param jp Priority of the job.
+   */
+  @Deprecated
+  public JobStatus(JobID jobid, float setupProgress, float mapProgress,
+      float reduceProgress, float cleanupProgress, 
+      int runState, JobPriority jp) {
+    this(jobid, setupProgress, mapProgress, reduceProgress, cleanupProgress,
+        runState, jp, null, null, null, null);
+  }
 
   /**
    * Create a job status object for a given jobid.

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/QueueConfigurationParser.java Fri Mar  1 19:37:03 2013
@@ -449,7 +449,7 @@ class QueueConfigurationParser {
     q.appendChild(propsElement);
 
     // Queue-state
-    String queueState = jqi.getQueueState();
+    String queueState = jqi.getState().getStateName();
     if (queueState != null
         && !queueState.equals(QueueState.UNDEFINED.getStateName())) {
       Element qStateElement = document.createElement(STATE_TAG);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Fri Mar  1 19:37:03 2013
@@ -63,6 +63,9 @@ public interface MRJobConfig {
 
   public static final String SPLIT_FILE = "mapreduce.job.splitfile";
 
+  public static final String SPLIT_METAINFO_MAXSIZE = "mapreduce.job.split.metainfo.maxsize";
+  public static final long DEFAULT_SPLIT_METAINFO_MAXSIZE = 10000000L;
+
   public static final String NUM_MAPS = "mapreduce.job.maps";
 
   public static final String MAX_TASK_FAILURES_PER_TRACKER = "mapreduce.job.maxtaskfailures.per.tracker";

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Fri Mar  1 19:37:03 2013
@@ -295,6 +295,7 @@ public class JobHistoryParser implements
     attemptInfo.shuffleFinishTime = event.getFinishTime();
     attemptInfo.sortFinishTime = event.getFinishTime();
     attemptInfo.mapFinishTime = event.getFinishTime();
+    attemptInfo.counters = event.getCounters();
     if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status))
     {
       //this is a successful task
@@ -347,6 +348,7 @@ public class JobHistoryParser implements
     taskInfo.finishTime = event.getFinishTime();
     taskInfo.error = StringInterner.weakIntern(event.getError());
     taskInfo.failedDueToAttemptId = event.getFailedAttemptID();
+    taskInfo.counters = event.getCounters();
     info.errorInfo = "Task " + taskInfo.taskId +" failed " +
     taskInfo.attemptsMap.size() + " times ";
   }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptUnsuccessfulCompletionEvent.java Fri Mar  1 19:37:03 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapreduce.jobh
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -36,8 +37,24 @@ import org.apache.avro.util.Utf8;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TaskAttemptUnsuccessfulCompletionEvent implements HistoryEvent {
-  private TaskAttemptUnsuccessfulCompletion datum =
-    new TaskAttemptUnsuccessfulCompletion();
+
+  private TaskAttemptUnsuccessfulCompletion datum = null;
+
+  private TaskAttemptID attemptId;
+  private TaskType taskType;
+  private String status;
+  private long finishTime;
+  private String hostname;
+  private int port;
+  private String rackName;
+  private String error;
+  private Counters counters;
+  int[][] allSplits;
+  int[] clockSplits;
+  int[] cpuUsages;
+  int[] vMemKbytes;
+  int[] physMemKbytes;
+  private static final Counters EMPTY_COUNTERS = new Counters();
 
   /** 
    * Create an event to record the unsuccessful completion of attempts
@@ -49,6 +66,7 @@ public class TaskAttemptUnsuccessfulComp
    * @param port rpc port for for the tracker
    * @param rackName Name of the rack where the attempt executed
    * @param error Error string
+   * @param counters Counters for the attempt
    * @param allSplits the "splits", or a pixelated graph of various
    *        measurable worker node state variables against progress.
    *        Currently there are four; wallclock time, CPU time,
@@ -58,31 +76,25 @@ public class TaskAttemptUnsuccessfulComp
        (TaskAttemptID id, TaskType taskType,
         String status, long finishTime,
         String hostname, int port, String rackName,
-        String error, int[][] allSplits) {
-    datum.taskid = new Utf8(id.getTaskID().toString());
-    datum.taskType = new Utf8(taskType.name());
-    datum.attemptId = new Utf8(id.toString());
-    datum.finishTime = finishTime;
-    datum.hostname = new Utf8(hostname);
-    if (rackName != null) {
-      datum.rackname = new Utf8(rackName);
-    }
-    datum.port = port;
-    datum.error = new Utf8(error);
-    datum.status = new Utf8(status);
-
-    datum.clockSplits 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetWallclockTime(allSplits));
-    datum.cpuUsages 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetCPUTime(allSplits));
-    datum.vMemKbytes 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetVMemKbytes(allSplits));
-    datum.physMemKbytes 
-      = AvroArrayUtils.toAvro
-           (ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits));
+        String error, Counters counters, int[][] allSplits) {
+    this.attemptId = id;
+    this.taskType = taskType;
+    this.status = status;
+    this.finishTime = finishTime;
+    this.hostname = hostname;
+    this.port = port;
+    this.rackName = rackName;
+    this.error = error;
+    this.counters = counters;
+    this.allSplits = allSplits;
+    this.clockSplits =
+        ProgressSplitsBlock.arrayGetWallclockTime(allSplits);
+    this.cpuUsages =
+        ProgressSplitsBlock.arrayGetCPUTime(allSplits);
+    this.vMemKbytes =
+        ProgressSplitsBlock.arrayGetVMemKbytes(allSplits);
+    this.physMemKbytes =
+        ProgressSplitsBlock.arrayGetPhysMemKbytes(allSplits);
   }
 
   /** 
@@ -103,42 +115,109 @@ public class TaskAttemptUnsuccessfulComp
        (TaskAttemptID id, TaskType taskType,
         String status, long finishTime, 
         String hostname, String error) {
-    this(id, taskType, status, finishTime, hostname, -1, "", error, null);
+    this(id, taskType, status, finishTime, hostname, -1, "",
+        error, EMPTY_COUNTERS, null);
+  }
+  
+  public TaskAttemptUnsuccessfulCompletionEvent
+      (TaskAttemptID id, TaskType taskType,
+       String status, long finishTime,
+       String hostname, int port, String rackName,
+       String error, int[][] allSplits) {
+    this(id, taskType, status, finishTime, hostname, port,
+        rackName, error, EMPTY_COUNTERS, null);
   }
 
   TaskAttemptUnsuccessfulCompletionEvent() {}
 
-  public Object getDatum() { return datum; }
-  public void setDatum(Object datum) {
-    this.datum = (TaskAttemptUnsuccessfulCompletion)datum;
+  public Object getDatum() {
+    if(datum == null) {
+      datum = new TaskAttemptUnsuccessfulCompletion();
+      datum.taskid = new Utf8(attemptId.getTaskID().toString());
+      datum.taskType = new Utf8(taskType.name());
+      datum.attemptId = new Utf8(attemptId.toString());
+      datum.finishTime = finishTime;
+      datum.hostname = new Utf8(hostname);
+      if (rackName != null) {
+        datum.rackname = new Utf8(rackName);
+      }
+      datum.port = port;
+      datum.error = new Utf8(error);
+      datum.status = new Utf8(status);
+
+      datum.counters = EventWriter.toAvro(counters);
+
+      datum.clockSplits = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetWallclockTime(allSplits));
+      datum.cpuUsages = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetCPUTime(allSplits));
+      datum.vMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetVMemKbytes(allSplits));
+      datum.physMemKbytes = AvroArrayUtils.toAvro(ProgressSplitsBlock
+          .arrayGetPhysMemKbytes(allSplits));
+    }
+    return datum;
+  }
+  
+  
+  
+  public void setDatum(Object odatum) {
+    this.datum =
+        (TaskAttemptUnsuccessfulCompletion)odatum;
+    this.attemptId =
+        TaskAttemptID.forName(datum.attemptId.toString());
+    this.taskType =
+        TaskType.valueOf(datum.taskType.toString());
+    this.finishTime = datum.finishTime;
+    this.hostname = datum.hostname.toString();
+    this.rackName = datum.rackname.toString();
+    this.port = datum.port;
+    this.status = datum.status.toString();
+    this.error = datum.error.toString();
+    this.counters =
+        EventReader.fromAvro(datum.counters);
+    this.clockSplits =
+        AvroArrayUtils.fromAvro(datum.clockSplits);
+    this.cpuUsages =
+        AvroArrayUtils.fromAvro(datum.cpuUsages);
+    this.vMemKbytes =
+        AvroArrayUtils.fromAvro(datum.vMemKbytes);
+    this.physMemKbytes =
+        AvroArrayUtils.fromAvro(datum.physMemKbytes);
   }
 
   /** Get the task id */
-  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
+  public TaskID getTaskId() {
+    return attemptId.getTaskID();
+  }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(datum.taskType.toString());
+    return TaskType.valueOf(taskType.toString());
   }
   /** Get the attempt id */
   public TaskAttemptID getTaskAttemptId() {
-    return TaskAttemptID.forName(datum.attemptId.toString());
+    return attemptId;
   }
   /** Get the finish time */
-  public long getFinishTime() { return datum.finishTime; }
+  public long getFinishTime() { return finishTime; }
   /** Get the name of the host where the attempt executed */
-  public String getHostname() { return datum.hostname.toString(); }
+  public String getHostname() { return hostname; }
   /** Get the rpc port for the host where the attempt executed */
-  public int getPort() { return datum.port; }
+  public int getPort() { return port; }
   
   /** Get the rack name of the node where the attempt ran */
   public String getRackName() {
-    return datum.rackname == null ? null : datum.rackname.toString();
+    return rackName == null ? null : rackName.toString();
   }
   
   /** Get the error string */
-  public String getError() { return datum.error.toString(); }
+  public String getError() { return error.toString(); }
   /** Get the task status */
-  public String getTaskStatus() { return datum.status.toString(); }
+  public String getTaskStatus() {
+    return status.toString();
+  }
+  /** Get the counters */
+  Counters getCounters() { return counters; }
   /** Get the event type */
   public EventType getEventType() {
     // Note that the task type can be setup/map/reduce/cleanup but the 
@@ -157,16 +236,16 @@ public class TaskAttemptUnsuccessfulComp
 
 
   public int[] getClockSplits() {
-    return AvroArrayUtils.fromAvro(datum.clockSplits);
+    return clockSplits;
   }
   public int[] getCpuUsages() {
-    return AvroArrayUtils.fromAvro(datum.cpuUsages);
+    return cpuUsages;
   }
   public int[] getVMemKbytes() {
-    return AvroArrayUtils.fromAvro(datum.vMemKbytes);
+    return vMemKbytes;
   }
   public int[] getPhysMemKbytes() {
-    return AvroArrayUtils.fromAvro(datum.physMemKbytes);
+    return physMemKbytes;
   }
 
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFailedEvent.java Fri Mar  1 19:37:03 2013
@@ -18,10 +18,9 @@
 
 package org.apache.hadoop.mapreduce.jobhistory;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskID;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -35,7 +34,17 @@ import org.apache.avro.util.Utf8;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TaskFailedEvent implements HistoryEvent {
-  private TaskFailed datum = new TaskFailed();
+  private TaskFailed datum = null;
+
+  private TaskAttemptID failedDueToAttempt;
+  private TaskID id;
+  private TaskType taskType;
+  private long finishTime;
+  private String status;
+  private String error;
+  private Counters counters;
+
+  private static final Counters EMPTY_COUNTERS = new Counters();
 
   /**
    * Create an event to record task failure
@@ -45,45 +54,87 @@ public class TaskFailedEvent implements 
    * @param error Error String
    * @param status Status
    * @param failedDueToAttempt The attempt id due to which the task failed
+   * @param counters Counters for the task
    */
   public TaskFailedEvent(TaskID id, long finishTime, 
       TaskType taskType, String error, String status,
-      TaskAttemptID failedDueToAttempt) {
-    datum.taskid = new Utf8(id.toString());
-    datum.error = new Utf8(error);
-    datum.finishTime = finishTime;
-    datum.taskType = new Utf8(taskType.name());
-    datum.failedDueToAttempt = failedDueToAttempt == null
-      ? null
-      : new Utf8(failedDueToAttempt.toString());
-    datum.status = new Utf8(status);
+      TaskAttemptID failedDueToAttempt, Counters counters) {
+    this.id = id;
+    this.finishTime = finishTime;
+    this.taskType = taskType;
+    this.error = error;
+    this.status = status;
+    this.failedDueToAttempt = failedDueToAttempt;
+    this.counters = counters;
   }
 
+  public TaskFailedEvent(TaskID id, long finishTime, 
+	      TaskType taskType, String error, String status,
+	      TaskAttemptID failedDueToAttempt) {
+    this(id, finishTime, taskType, error, status,
+        failedDueToAttempt, EMPTY_COUNTERS);
+  }
+  
   TaskFailedEvent() {}
 
-  public Object getDatum() { return datum; }
-  public void setDatum(Object datum) { this.datum = (TaskFailed)datum; }
+  public Object getDatum() {
+    if(datum == null) {
+      datum = new TaskFailed();
+      datum.taskid = new Utf8(id.toString());
+      datum.error = new Utf8(error);
+      datum.finishTime = finishTime;
+      datum.taskType = new Utf8(taskType.name());
+      datum.failedDueToAttempt =
+          failedDueToAttempt == null
+          ? null
+          : new Utf8(failedDueToAttempt.toString());
+      datum.status = new Utf8(status);
+      datum.counters = EventWriter.toAvro(counters);
+    }
+    return datum;
+  }
+  
+  public void setDatum(Object odatum) {
+    this.datum = (TaskFailed)odatum;
+    this.id =
+        TaskID.forName(datum.taskid.toString());
+    this.taskType =
+        TaskType.valueOf(datum.taskType.toString());
+    this.finishTime = datum.finishTime;
+    this.error = datum.error.toString();
+    this.failedDueToAttempt =
+        datum.failedDueToAttempt == null
+        ? null
+        : TaskAttemptID.forName(
+            datum.failedDueToAttempt.toString());
+    this.status = datum.status.toString();
+    this.counters =
+        EventReader.fromAvro(datum.counters);
+  }
 
   /** Get the task id */
-  public TaskID getTaskId() { return TaskID.forName(datum.taskid.toString()); }
+  public TaskID getTaskId() { return id; }
   /** Get the error string */
-  public String getError() { return datum.error.toString(); }
+  public String getError() { return error; }
   /** Get the finish time of the attempt */
-  public long getFinishTime() { return datum.finishTime; }
+  public long getFinishTime() {
+    return finishTime;
+  }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(datum.taskType.toString());
+    return taskType;
   }
   /** Get the attempt id due to which the task failed */
   public TaskAttemptID getFailedAttemptID() {
-    return datum.failedDueToAttempt == null
-      ? null
-      : TaskAttemptID.forName(datum.failedDueToAttempt.toString());
+    return failedDueToAttempt;
   }
   /** Get the task status */
-  public String getTaskStatus() { return datum.status.toString(); }
+  public String getTaskStatus() { return status; }
+  /** Get task counters */
+  public Counters getCounters() { return counters; }
   /** Get the event type */
-  public EventType getEventType() { return EventType.TASK_FAILED; }
+  public EventType getEventType() {
+    return EventType.TASK_FAILED;
+  }
 
-  
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java Fri Mar  1 19:37:03 2013
@@ -49,6 +49,8 @@ import org.apache.hadoop.mapreduce.TaskA
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.NetworkTopology;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * An abstract {@link InputFormat} that returns {@link CombineFileSplit}'s in 
  * {@link InputFormat#getSplits(JobContext)} method. 
@@ -76,7 +78,7 @@ import org.apache.hadoop.net.NetworkTopo
 @InterfaceStability.Stable
 public abstract class CombineFileInputFormat<K, V>
   extends FileInputFormat<K, V> {
-
+  
   public static final String SPLIT_MINSIZE_PERNODE = 
     "mapreduce.input.fileinputformat.split.minsize.per.node";
   public static final String SPLIT_MINSIZE_PERRACK = 
@@ -163,7 +165,6 @@ public abstract class CombineFileInputFo
   @Override
   public List<InputSplit> getSplits(JobContext job) 
     throws IOException {
-
     long minSizeNode = 0;
     long minSizeRack = 0;
     long maxSize = 0;
@@ -286,56 +287,100 @@ public abstract class CombineFileInputFo
                                  rackToNodes, maxSize);
       totLength += files[i].getLength();
     }
+    createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength, 
+                 maxSize, minSizeNode, minSizeRack, splits);
+  }
 
+  @VisibleForTesting
+  void createSplits(HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                     HashMap<OneBlockInfo, String[]> blockToNodes,
+                     HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                     long totLength,
+                     long maxSize,
+                     long minSizeNode,
+                     long minSizeRack,
+                     List<InputSplit> splits                     
+                    ) {
     ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
     Set<String> nodes = new HashSet<String>();
     long curSplitSize = 0;
+    
+    int numNodes = nodeToBlocks.size();
+    long totalLength = totLength;
 
-    // process all nodes and create splits that are local
-    // to a node. 
-    for (Iterator<Map.Entry<String, 
-         List<OneBlockInfo>>> iter = nodeToBlocks.entrySet().iterator(); 
-         iter.hasNext();) {
-
-      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
-      nodes.add(one.getKey());
-      List<OneBlockInfo> blocksInNode = one.getValue();
-
-      // for each block, copy it into validBlocks. Delete it from 
-      // blockToNodes so that the same block does not appear in 
-      // two different splits.
-      for (OneBlockInfo oneblock : blocksInNode) {
-        if (blockToNodes.containsKey(oneblock)) {
-          validBlocks.add(oneblock);
-          blockToNodes.remove(oneblock);
-          curSplitSize += oneblock.length;
-
-          // if the accumulated split size exceeds the maximum, then 
-          // create this split.
-          if (maxSize != 0 && curSplitSize >= maxSize) {
-            // create an input split and add it to the splits array
-            addCreatedSplit(splits, nodes, validBlocks);
-            curSplitSize = 0;
-            validBlocks.clear();
+    while(true) {
+      // it is allowed for maxSize to be 0. Disable smoothing load for such cases
+      int avgSplitsPerNode = maxSize > 0 && numNodes > 0 ?
+                                        ((int) (totalLength/maxSize))/numNodes
+                                        : Integer.MAX_VALUE;
+      int maxSplitsByNodeOnly = (avgSplitsPerNode > 0) ? avgSplitsPerNode : 1;
+      numNodes = 0;
+
+      // process all nodes and create splits that are local to a node.
+      for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter = nodeToBlocks
+          .entrySet().iterator(); iter.hasNext();) {
+        Map.Entry<String, List<OneBlockInfo>> one = iter.next();
+        nodes.add(one.getKey());
+        List<OneBlockInfo> blocksInNode = one.getValue();
+
+        // for each block, copy it into validBlocks. Delete it from
+        // blockToNodes so that the same block does not appear in
+        // two different splits.
+        int splitsInNode = 0;
+        for (OneBlockInfo oneblock : blocksInNode) {
+          if (blockToNodes.containsKey(oneblock)) {
+            validBlocks.add(oneblock);
+            blockToNodes.remove(oneblock);
+            curSplitSize += oneblock.length;
+
+            // if the accumulated split size exceeds the maximum, then
+            // create this split.
+            if (maxSize != 0 && curSplitSize >= maxSize) {
+              // create an input split and add it to the splits array
+              addCreatedSplit(splits, nodes, validBlocks);
+              totalLength -= curSplitSize;
+              curSplitSize = 0;
+              validBlocks.clear();
+              splitsInNode++;
+              if (splitsInNode == maxSplitsByNodeOnly) {
+                // stop grouping on a node so as not to create
+                // disproportionately more splits on a node because it happens
+                // to have many blocks
+                // consider only these nodes in next round of grouping because
+                // they have leftover blocks that may need to be grouped
+                numNodes++;
+                break;
+              }
+            }
           }
         }
-      }
-      // if there were any blocks left over and their combined size is
-      // larger than minSplitNode, then combine them into one split.
-      // Otherwise add them back to the unprocessed pool. It is likely 
-      // that they will be combined with other blocks from the 
-      // same rack later on.
-      if (minSizeNode != 0 && curSplitSize >= minSizeNode) {
-        // create an input split and add it to the splits array
-        addCreatedSplit(splits, nodes, validBlocks);
-      } else {
-        for (OneBlockInfo oneblock : validBlocks) {
-          blockToNodes.put(oneblock, oneblock.hosts);
+        // if there were any blocks left over and their combined size is
+        // larger than minSplitNode, then combine them into one split.
+        // Otherwise add them back to the unprocessed pool. It is likely
+        // that they will be combined with other blocks from the
+        // same rack later on.
+        if (minSizeNode != 0 && curSplitSize >= minSizeNode
+            && splitsInNode == 0) {
+          // haven't created any split on this machine. so its ok to add a
+          // smaller
+          // one for parallelism. Otherwise group it in the rack for balanced
+          // size
+          // create an input split and add it to the splits array
+          addCreatedSplit(splits, nodes, validBlocks);
+          totalLength -= curSplitSize;
+        } else {
+          for (OneBlockInfo oneblock : validBlocks) {
+            blockToNodes.put(oneblock, oneblock.hosts);
+          }
         }
+        validBlocks.clear();
+        nodes.clear();
+        curSplitSize = 0;
+      }
+      
+      if(!(numNodes>0 && totalLength>0)) {
+        break;
       }
-      validBlocks.clear();
-      nodes.clear();
-      curSplitSize = 0;
     }
 
     // if blocks in a rack are below the specified minimum size, then keep them
@@ -458,7 +503,6 @@ public abstract class CombineFileInputFo
       offset[i] = validBlocks.get(i).offset;
       length[i] = validBlocks.get(i).length;
     }
-
      // add this split to the list that is returned
     CombineFileSplit thissplit = new CombineFileSplit(fl, offset, 
                                    length, locations.toArray(new String[0]));
@@ -474,7 +518,8 @@ public abstract class CombineFileInputFo
   /**
    * information about one file from the File System
    */
-  private static class OneFileInfo {
+  @VisibleForTesting
+  static class OneFileInfo {
     private long fileSize;               // size of the file
     private OneBlockInfo[] blocks;       // all blocks in this file
 
@@ -545,45 +590,55 @@ public abstract class CombineFileInputFo
           }
           blocks = blocksList.toArray(new OneBlockInfo[blocksList.size()]);
         }
+        
+        populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
+                          nodeToBlocks, rackToNodes);
+      }
+    }
+    
+    @VisibleForTesting
+    static void populateBlockInfo(OneBlockInfo[] blocks,
+                          HashMap<String, List<OneBlockInfo>> rackToBlocks,
+                          HashMap<OneBlockInfo, String[]> blockToNodes,
+                          HashMap<String, List<OneBlockInfo>> nodeToBlocks,
+                          HashMap<String, Set<String>> rackToNodes) {
+      for (OneBlockInfo oneblock : blocks) {
+        // add this block to the block --> node locations map
+        blockToNodes.put(oneblock, oneblock.hosts);
+
+        // For blocks that do not have host/rack information,
+        // assign to default  rack.
+        String[] racks = null;
+        if (oneblock.hosts.length == 0) {
+          racks = new String[]{NetworkTopology.DEFAULT_RACK};
+        } else {
+          racks = oneblock.racks;
+        }
 
-        for (OneBlockInfo oneblock : blocks) {
-          // add this block to the block --> node locations map
-          blockToNodes.put(oneblock, oneblock.hosts);
-
-          // For blocks that do not have host/rack information,
-          // assign to default  rack.
-          String[] racks = null;
-          if (oneblock.hosts.length == 0) {
-            racks = new String[]{NetworkTopology.DEFAULT_RACK};
-          } else {
-            racks = oneblock.racks;
+        // add this block to the rack --> block map
+        for (int j = 0; j < racks.length; j++) {
+          String rack = racks[j];
+          List<OneBlockInfo> blklist = rackToBlocks.get(rack);
+          if (blklist == null) {
+            blklist = new ArrayList<OneBlockInfo>();
+            rackToBlocks.put(rack, blklist);
           }
-
-          // add this block to the rack --> block map
-          for (int j = 0; j < racks.length; j++) {
-            String rack = racks[j];
-            List<OneBlockInfo> blklist = rackToBlocks.get(rack);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              rackToBlocks.put(rack, blklist);
-            }
-            blklist.add(oneblock);
-            if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
-              // Add this host to rackToNodes map
-              addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
-            }
+          blklist.add(oneblock);
+          if (!racks[j].equals(NetworkTopology.DEFAULT_RACK)) {
+            // Add this host to rackToNodes map
+            addHostToRack(rackToNodes, racks[j], oneblock.hosts[j]);
           }
+        }
 
-          // add this block to the node --> block map
-          for (int j = 0; j < oneblock.hosts.length; j++) {
-            String node = oneblock.hosts[j];
-            List<OneBlockInfo> blklist = nodeToBlocks.get(node);
-            if (blklist == null) {
-              blklist = new ArrayList<OneBlockInfo>();
-              nodeToBlocks.put(node, blklist);
-            }
-            blklist.add(oneblock);
+        // add this block to the node --> block map
+        for (int j = 0; j < oneblock.hosts.length; j++) {
+          String node = oneblock.hosts[j];
+          List<OneBlockInfo> blklist = nodeToBlocks.get(node);
+          if (blklist == null) {
+            blklist = new ArrayList<OneBlockInfo>();
+            nodeToBlocks.put(node, blklist);
           }
+          blklist.add(oneblock);
         }
       }
     }
@@ -600,7 +655,8 @@ public abstract class CombineFileInputFo
   /**
    * information about one block from the File System
    */
-  private static class OneBlockInfo {
+  @VisibleForTesting
+  static class OneBlockInfo {
     Path onepath;                // name of this file
     long offset;                 // offset in file
     long length;                 // length of this block

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/split/SplitMetaInfoReader.java Fri Mar  1 19:37:03 2013
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapreduce.spli
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -29,9 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 
 /**
  * A utility that reads the split meta info and creates
@@ -44,8 +44,8 @@ public class SplitMetaInfoReader {
   public static JobSplit.TaskSplitMetaInfo[] readSplitMetaInfo(
       JobID jobId, FileSystem fs, Configuration conf, Path jobSubmitDir) 
   throws IOException {
-    long maxMetaInfoSize = conf.getLong(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE, 
-        10000000L);
+    long maxMetaInfoSize = conf.getLong(MRJobConfig.SPLIT_METAINFO_MAXSIZE,
+        MRJobConfig.DEFAULT_SPLIT_METAINFO_MAXSIZE);
     Path metaSplitFile = JobSubmissionFiles.getJobSplitMetaFile(jobSubmitDir);
     String jobSplitFile = JobSubmissionFiles.getJobSplitFile(jobSubmitDir).toString();
     FileStatus fStatus = fs.getFileStatus(metaSplitFile);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/MergeManagerImpl.java Fri Mar  1 19:37:03 2013
@@ -475,9 +475,9 @@ public class MergeManagerImpl<K, V> impl
           combineCollector.setWriter(writer);
           combineAndSpill(rIter, reduceCombineInputCounter);
         }
+        writer.close();
         compressAwarePath = new CompressAwarePath(outputPath,
             writer.getRawLength());
-        writer.close();
 
         LOG.info(reduceId +  
             " Merge of the " + noInMemorySegments +
@@ -552,9 +552,9 @@ public class MergeManagerImpl<K, V> impl
                             mergedMapOutputsCounter, null);
 
         Merger.writeFile(iter, writer, reporter, jobConf);
+        writer.close();
         compressAwarePath = new CompressAwarePath(outputPath,
             writer.getRawLength());
-        writer.close();
       } catch (IOException e) {
         localFS.delete(outputPath, true);
         throw e;
@@ -713,13 +713,15 @@ public class MergeManagerImpl<K, V> impl
             keyClass, valueClass, memDiskSegments, numMemDiskSegments,
             tmpDir, comparator, reporter, spilledRecordsCounter, null, 
             mergePhase);
-        final Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
+        Writer<K,V> writer = new Writer<K,V>(job, fs, outputPath,
             keyClass, valueClass, codec, null);
         try {
           Merger.writeFile(rIter, writer, reporter, job);
-          // add to list of final disk outputs.
+          writer.close();
           onDiskMapOutputs.add(new CompressAwarePath(outputPath,
               writer.getRawLength()));
+          writer = null;
+          // add to list of final disk outputs.
         } catch (IOException e) {
           if (null != outputPath) {
             try {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java Fri Mar  1 19:37:03 2013
@@ -521,6 +521,8 @@ public class ConfigUtil {
     });
     Configuration.addDeprecation("mapreduce.user.classpath.first",
       MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST);
+    Configuration.addDeprecation(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
+        MRJobConfig.SPLIT_METAINFO_MAXSIZE);
   }
 
   public static void main(String[] args) {

Propchange: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml:r1448457-1451693

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryParsing.java Fri Mar  1 19:37:03 2013
@@ -404,7 +404,7 @@ public class TestJobHistoryParsing {
     }
   }
   
-  @Test
+  @Test (timeout=5000)
   public void testCountersForFailedTask() throws Exception {
     LOG.info("STARTING testCountersForFailedTask");
     try {
@@ -455,6 +455,9 @@ public class TestJobHistoryParsing {
       CompletedTask ct = new CompletedTask(yarnTaskID, entry.getValue());
       Assert.assertNotNull("completed task report has null counters",
           ct.getReport().getCounters());
+      //Make sure all the completedTask has counters, and the counters are not empty
+      Assert.assertTrue(ct.getReport().getCounters()
+          .getAllCounterGroups().size() > 0);
     }
     } finally {
       LOG.info("FINISHED testCountersForFailedTask");

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Fri Mar  1 19:37:03 2013
@@ -106,8 +106,9 @@ public class ResourceMgrDelegate extends
 
   public QueueInfo getQueue(String queueName) throws IOException,
   InterruptedException {
-    return TypeConverter.fromYarn(
-        super.getQueueInfo(queueName), this.conf);
+    org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
+        super.getQueueInfo(queueName);
+    return (queueInfo == null) ? null : TypeConverter.fromYarn(queueInfo, conf);
   }
 
   public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException,

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestJobConf.java Fri Mar  1 19:37:03 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.ClassUtil;
 
 
 import static org.junit.Assert.*;
@@ -79,7 +80,7 @@ public class TestJobConf {
     Class clazz = Class.forName(CLASSNAME, true, cl);
     assertNotNull(clazz);
 
-    String containingJar = JobConf.findContainingJar(clazz);
+    String containingJar = ClassUtil.findContainingJar(clazz);
     assertEquals(jar.getAbsolutePath(), containingJar);
   }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java Fri Mar  1 19:37:03 2013
@@ -20,11 +20,14 @@ package org.apache.hadoop.mapreduce.lib.
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
+import java.util.HashMap;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Set;
 import java.util.zip.GZIPOutputStream;
 import java.util.concurrent.TimeoutException;
 
+import junit.framework.Assert;
 import junit.framework.TestCase;
 
 import org.apache.hadoop.fs.*;
@@ -42,9 +45,13 @@ import org.apache.hadoop.mapreduce.Recor
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneBlockInfo;
+import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat.OneFileInfo;
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
 import org.junit.Test;
 
+import com.google.common.collect.HashMultiset;
+
 public class TestCombineFileInputFormat extends TestCase {
 
   private static final String rack1[] = new String[] {
@@ -476,23 +483,23 @@ public class TestCombineFileInputFormat 
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
       assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file3.getName(), fileSplit.getPath(0).getName());
-      assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(0));
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
-      assertEquals(0, fileSplit.getOffset(1));
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(2);
       assertEquals(2, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
-      assertEquals(BLOCKSIZE, fileSplit.getOffset(0));
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file3.getName(), fileSplit.getPath(1).getName());
       assertEquals(2 * BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
 
       // maximum split size is 3 blocks 
       inFormat = new DummyInputFormat();
@@ -504,7 +511,7 @@ public class TestCombineFileInputFormat 
       for (InputSplit split : splits) {
         System.out.println("File split(Test5): " + split);
       }
-      assertEquals(4, splits.size());
+      assertEquals(3, splits.size());
       fileSplit = (CombineFileSplit) splits.get(0);
       assertEquals(3, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
@@ -519,32 +526,28 @@ public class TestCombineFileInputFormat 
       assertEquals(BLOCKSIZE, fileSplit.getLength(2));
       assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
       assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
       assertEquals(file4.getName(), fileSplit.getPath(2).getName());
-      assertEquals( 2 * BLOCKSIZE, fileSplit.getOffset(2));
+      assertEquals(0, fileSplit.getOffset(2));
       assertEquals(BLOCKSIZE, fileSplit.getLength(2));
-      assertEquals("host3.rack3.com", fileSplit.getLocations()[0]);
+      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
       fileSplit = (CombineFileSplit) splits.get(2);
-      assertEquals(2, fileSplit.getNumPaths());
+      assertEquals(3, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
       assertEquals(BLOCKSIZE, fileSplit.getLength(0));
-      assertEquals(file2.getName(), fileSplit.getPath(1).getName());
+      assertEquals(file4.getName(), fileSplit.getPath(1).getName());
       assertEquals(BLOCKSIZE, fileSplit.getOffset(1));
       assertEquals(BLOCKSIZE, fileSplit.getLength(1));
-      assertEquals("host2.rack2.com", fileSplit.getLocations()[0]);
-      fileSplit = (CombineFileSplit) splits.get(3);
-      assertEquals(1, fileSplit.getNumPaths());
-      assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
-      assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(BLOCKSIZE, fileSplit.getLength(0));
+      assertEquals(file4.getName(), fileSplit.getPath(2).getName());
+      assertEquals(2*BLOCKSIZE, fileSplit.getOffset(2));
+      assertEquals(BLOCKSIZE, fileSplit.getLength(2));
       assertEquals("host1.rack1.com", fileSplit.getLocations()[0]);
 
       // maximum split size is 4 blocks 
@@ -713,6 +716,56 @@ public class TestCombineFileInputFormat 
     DFSTestUtil.waitReplication(fileSys, name, replication);
   }
   
+  public void testNodeInputSplit() throws IOException, InterruptedException {
+    // Regression test for MAPREDUCE-4892. There are 2 nodes with all blocks on 
+    // both nodes. The grouping ensures that both nodes get splits instead of 
+    // just the first node
+    DummyInputFormat inFormat = new DummyInputFormat();
+    int numBlocks = 12;
+    long totLength = 0;
+    long blockSize = 100;
+    long maxSize = 200;
+    long minSizeNode = 50;
+    long minSizeRack = 50;
+    String[] locations = { "h1", "h2" };
+    String[] racks = new String[0];
+    Path path = new Path("hdfs://file");
+    
+    OneBlockInfo[] blocks = new OneBlockInfo[numBlocks];
+    for(int i=0; i<numBlocks; ++i) {
+      blocks[i] = new OneBlockInfo(path, i*blockSize, blockSize, locations, racks);
+      totLength += blockSize;
+    }
+    
+    List<InputSplit> splits = new ArrayList<InputSplit>();
+    HashMap<String, Set<String>> rackToNodes = 
+                              new HashMap<String, Set<String>>();
+    HashMap<String, List<OneBlockInfo>> rackToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    HashMap<OneBlockInfo, String[]> blockToNodes = 
+                              new HashMap<OneBlockInfo, String[]>();
+    HashMap<String, List<OneBlockInfo>> nodeToBlocks = 
+                              new HashMap<String, List<OneBlockInfo>>();
+    
+    OneFileInfo.populateBlockInfo(blocks, rackToBlocks, blockToNodes, 
+                             nodeToBlocks, rackToNodes);
+    
+    inFormat.createSplits(nodeToBlocks, blockToNodes, rackToBlocks, totLength,  
+                          maxSize, minSizeNode, minSizeRack, splits);
+    
+    int expectedSplitCount = (int)(totLength/maxSize);
+    Assert.assertEquals(expectedSplitCount, splits.size());
+    HashMultiset<String> nodeSplits = HashMultiset.create();
+    for(int i=0; i<expectedSplitCount; ++i) {
+      InputSplit inSplit = splits.get(i);
+      Assert.assertEquals(maxSize, inSplit.getLength());
+      Assert.assertEquals(1, inSplit.getLocations().length);
+      nodeSplits.add(inSplit.getLocations()[0]);
+    }
+    Assert.assertEquals(3, nodeSplits.count(locations[0]));
+    Assert.assertEquals(3, nodeSplits.count(locations[1]));
+  }
+  
   public void testSplitPlacementForCompressedFiles() throws Exception {
     MiniDFSCluster dfs = null;
     FileSystem fileSys = null;
@@ -889,24 +942,24 @@ public class TestCombineFileInputFormat 
       assertEquals(f3.getLen(), fileSplit.getLength(0));
       assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
       fileSplit = (CombineFileSplit) splits.get(1);
-      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f4.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r3
+      assertEquals(f2.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r3
       fileSplit = (CombineFileSplit) splits.get(2);
       assertEquals(1, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file2.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f2.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts2[0], fileSplit.getLocations()[0]); // should be on r2
+      assertEquals(f1.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r2
       fileSplit = (CombineFileSplit) splits.get(3);
       assertEquals(1, fileSplit.getNumPaths());
       assertEquals(1, fileSplit.getLocations().length);
-      assertEquals(file1.getName(), fileSplit.getPath(0).getName());
+      assertEquals(file4.getName(), fileSplit.getPath(0).getName());
       assertEquals(0, fileSplit.getOffset(0));
-      assertEquals(f1.getLen(), fileSplit.getLength(0));
-      assertEquals(hosts1[0], fileSplit.getLocations()[0]); // should be on r1
+      assertEquals(f4.getLen(), fileSplit.getLength(0));
+      assertEquals(hosts3[0], fileSplit.getLocations()[0]); // should be on r1
 
       // maximum split size is twice file1's length
       inFormat = new DummyInputFormat();

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/hadoop-mapreduce-client/pom.xml Fri Mar  1 19:37:03 2013
@@ -130,7 +130,7 @@
       <scope>test</scope>
     </dependency>
     <dependency>
-      <groupId>org.jboss.netty</groupId>
+      <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
     </dependency>
     <dependency>

Modified: hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/pom.xml?rev=1451695&r1=1451694&r2=1451695&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/pom.xml (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-mapreduce-project/pom.xml Fri Mar  1 19:37:03 2013
@@ -61,7 +61,7 @@
           <artifactId>ant</artifactId>
         </exclusion>
         <exclusion>
-          <groupId>org.jboss.netty</groupId>
+          <groupId>io.netty</groupId>
           <artifactId>netty</artifactId>
         </exclusion>
         <exclusion>
@@ -151,7 +151,7 @@
       <artifactId>junit</artifactId>
     </dependency>
     <dependency>
-      <groupId>org.jboss.netty</groupId>
+      <groupId>io.netty</groupId>
       <artifactId>netty</artifactId>
     </dependency>
     <dependency>



Mime
View raw message