hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1446832 [1/2] - in /hadoop/common/branches/HDFS-347/hadoop-mapreduce-project: ./ conf/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-...
Date Sat, 16 Feb 2013 01:12:20 GMT
Author: atm
Date: Sat Feb 16 01:12:07 2013
New Revision: 1446832

URL: http://svn.apache.org/r1446832
Log:
Merge trunk into HDFS-347 branch.

Added:
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
      - copied unchanged from r1446830, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/site/apt/PluggableShuffleAndPluggableSort.apt.vm
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/CommonStub.java
      - copied unchanged from r1446830, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/CommonStub.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationRunnableStub.java
      - copied unchanged from r1446830, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationRunnableStub.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationStub.java
      - copied unchanged from r1446830, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeApplicationStub.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeReducerStub.java
      - copied unchanged from r1446830, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/PipeReducerStub.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
      - copied unchanged from r1446830, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipeApplication.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipesNonJavaInputFormat.java
      - copied unchanged from r1446830, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/pipes/TestPipesNonJavaInputFormat.java
Modified:
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/   (props changed)
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/conf/   (props changed)
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskAttemptFinishedEvent.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml   (props changed)
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTaskPage.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksBlock.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/webapp/HsTasksPage.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestKeyValueTextInputFormat.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestLineRecordReader.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/lib/TestChainMapReduce.java
    hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/TestClientProtocolProviderImpls.java

Propchange: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project:r1440578-1446830

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/CHANGES.txt?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/CHANGES.txt Sat Feb 16 01:12:07 2013
@@ -152,7 +152,29 @@ Trunk (Unreleased)
     MAPREDUCE-4884. Streaming tests fail to start MiniMRCluster due to missing
     queue configuration. (Chris Nauroth via suresh)
 
-Release 2.0.3-alpha - Unreleased 
+Release 2.0.4-beta - UNRELEASED
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+  BUG FIXES
+
+    MAPREDUCE-4671. AM does not tell the RM about container requests which are
+    no longer needed. (Bikas Saha via sseth)
+
+    MAPREDUCE-4994. -jt generic command line option does not work. (sandyr via tucu)
+
+    MAPREDUCE-5000. Fixes getCounters when speculating by fixing the selection
+    of the best attempt for a task. (Jason Lowe via sseth)
+
+    MAPREDUCE-4994. Addendum fixing testcases failures. (sandyr via tucu)
+
+Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES
 
@@ -215,8 +237,17 @@ Release 2.0.3-alpha - Unreleased 
     MAPREDUCE-4838. Add additional fields like Locality, Avataar to the
     JobHistory logs. (Zhijie Shen via sseth)
 
+    MAPREDUCE-4971. Minor extensibility enhancements to Counters & 
+    FileOutputFormat. (Arun C Murthy via sseth)
+
+    MAPREDUCE-4977. Documentation for pluggable shuffle and pluggable sort. 
+    (tucu)
+
   OPTIMIZATIONS
 
+    MAPREDUCE-4893. Fixed MR ApplicationMaster to do optimal assignment of
+    containers to get maximum locality. (Bikas Saha via vinodkv)
+
   BUG FIXES
 
     MAPREDUCE-4607. Race condition in ReduceTask completion can result in Task
@@ -278,6 +309,11 @@ Release 2.0.3-alpha - Unreleased 
     MAPREDUCE-2264. Job status exceeds 100% in some cases. 
     (devaraj.k and sandyr via tucu)
 
+    MAPREDUCE-4969. TestKeyValueTextInputFormat test fails with Open JDK 7.
+    (Arpit Agarwal via suresh)
+
+    MAPREDUCE-4953. HadoopPipes misuses fprintf. (Andy Isaacson via atm)
+
 Release 2.0.2-alpha - 2012-09-07 
 
   INCOMPATIBLE CHANGES
@@ -662,16 +698,28 @@ Release 0.23.7 - UNRELEASED
 
   IMPROVEMENTS
 
+    MAPREDUCE-4905. test org.apache.hadoop.mapred.pipes 
+    (Aleksey Gorshkov via bobby)
+
+    MAPREDUCE-4989. JSONify DataTables input data for Attempts page (Ravi
+    Prakash via jlowe)
+
   OPTIMIZATIONS
 
     MAPREDUCE-4946. Fix a performance problem for large jobs by reducing the
     number of map completion event type conversions. (Jason Lowe via sseth)
 
+    MAPREDUCE-4822. Unnecessary conversions in History Events. (Chu Tong via
+    jlowe)
+
   BUG FIXES
 
     MAPREDUCE-4458. Warn if java.library.path is used for AM or Task
     (Robert Parker via jeagles)
 
+    MAPREDUCE-4992. AM hangs in RecoveryService when recovering tasks with
+    speculative attempts (Robert Parker via jlowe)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Propchange: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt:r1440578-1446830

Propchange: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/conf/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-mapreduce-project/conf:r1440578-1446830

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Sat Feb 16 01:12:07 2013
@@ -539,6 +539,10 @@ public abstract class TaskImpl implement
   //select the nextAttemptNumber with best progress
   // always called inside the Read Lock
   private TaskAttempt selectBestAttempt() {
+    if (successfulAttempt != null) {
+      return attempts.get(successfulAttempt);
+    }
+
     float progress = 0f;
     TaskAttempt result = null;
     for (TaskAttempt at : attempts.values()) {

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Sat Feb 16 01:12:07 2013
@@ -21,9 +21,12 @@ package org.apache.hadoop.mapreduce.v2.a
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,6 +38,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
@@ -108,7 +112,7 @@ public class RecoveryService extends Com
   private JobInfo jobInfo = null;
   private final Map<TaskId, TaskInfo> completedTasks =
     new HashMap<TaskId, TaskInfo>();
-
+  
   private final List<TaskEvent> pendingTaskScheduleEvents =
     new ArrayList<TaskEvent>();
 
@@ -193,6 +197,14 @@ public class RecoveryService extends Com
         .getAllTasks();
     for (TaskInfo taskInfo : taskInfos.values()) {
       if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+        Iterator<Entry<TaskAttemptID, TaskAttemptInfo>> taskAttemptIterator = 
+            taskInfo.getAllTaskAttempts().entrySet().iterator();
+        while (taskAttemptIterator.hasNext()) {
+          Map.Entry<TaskAttemptID, TaskAttemptInfo> currentEntry = taskAttemptIterator.next();
+          if (!jobInfo.getAllCompletedTaskAttempts().containsKey(currentEntry.getKey())) {
+            taskAttemptIterator.remove();
+          }
+        }
         completedTasks
             .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
         LOG.info("Read from history task "
@@ -215,6 +227,7 @@ public class RecoveryService extends Com
         JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(conf, jobId);
     Path histDirPath =
         FileContext.getFileContext(conf).makeQualified(new Path(jobhistoryDir));
+    LOG.info("Trying file " + histDirPath.toString());
     FileContext fc = FileContext.getFileContext(histDirPath.toUri(), conf);
     // read the previous history file
     historyFile =

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Sat Feb 16 01:12:07 2013
@@ -747,7 +747,7 @@ public class RMContainerAllocator extend
       addContainerReq(req);
     }
     
-    @SuppressWarnings("unchecked")
+    // this method will change the list of allocatedContainers.
     private void assign(List<Container> allocatedContainers) {
       Iterator<Container> it = allocatedContainers.iterator();
       LOG.info("Got allocated containers " + allocatedContainers.size());
@@ -788,84 +788,97 @@ public class RMContainerAllocator extend
                 + reduces.isEmpty()); 
             isAssignable = false;
           }
-        }          
+        } else {
+          LOG.warn("Container allocated at unwanted priority: " + priority + 
+              ". Returning to RM...");
+          isAssignable = false;
+        }
         
-        boolean blackListed = false;         
-        ContainerRequest assigned = null;
+        if(!isAssignable) {
+          // release container if we could not assign it 
+          containerNotAssigned(allocated);
+          it.remove();
+          continue;
+        }
         
-        if (isAssignable) {
-          // do not assign if allocated container is on a  
-          // blacklisted host
-          String allocatedHost = allocated.getNodeId().getHost();
-          blackListed = isNodeBlacklisted(allocatedHost);
-          if (blackListed) {
-            // we need to request for a new container 
-            // and release the current one
-            LOG.info("Got allocated container on a blacklisted "
-                + " host "+allocatedHost
-                +". Releasing container " + allocated);
-
-            // find the request matching this allocated container 
-            // and replace it with a new one 
-            ContainerRequest toBeReplacedReq = 
-                getContainerReqToReplace(allocated);
-            if (toBeReplacedReq != null) {
-              LOG.info("Placing a new container request for task attempt " 
-                  + toBeReplacedReq.attemptID);
-              ContainerRequest newReq = 
-                  getFilteredContainerRequest(toBeReplacedReq);
-              decContainerReq(toBeReplacedReq);
-              if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
-                  TaskType.MAP) {
-                maps.put(newReq.attemptID, newReq);
-              }
-              else {
-                reduces.put(newReq.attemptID, newReq);
-              }
-              addContainerReq(newReq);
+        // do not assign if allocated container is on a  
+        // blacklisted host
+        String allocatedHost = allocated.getNodeId().getHost();
+        if (isNodeBlacklisted(allocatedHost)) {
+          // we need to request for a new container 
+          // and release the current one
+          LOG.info("Got allocated container on a blacklisted "
+              + " host "+allocatedHost
+              +". Releasing container " + allocated);
+
+          // find the request matching this allocated container 
+          // and replace it with a new one 
+          ContainerRequest toBeReplacedReq = 
+              getContainerReqToReplace(allocated);
+          if (toBeReplacedReq != null) {
+            LOG.info("Placing a new container request for task attempt " 
+                + toBeReplacedReq.attemptID);
+            ContainerRequest newReq = 
+                getFilteredContainerRequest(toBeReplacedReq);
+            decContainerReq(toBeReplacedReq);
+            if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
+                TaskType.MAP) {
+              maps.put(newReq.attemptID, newReq);
             }
             else {
-              LOG.info("Could not map allocated container to a valid request."
-                  + " Releasing allocated container " + allocated);
+              reduces.put(newReq.attemptID, newReq);
             }
+            addContainerReq(newReq);
           }
           else {
-            assigned = assign(allocated);
-            if (assigned != null) {
-              // Update resource requests
-              decContainerReq(assigned);
-
-              // send the container-assigned event to task attempt
-              eventHandler.handle(new TaskAttemptContainerAssignedEvent(
-                  assigned.attemptID, allocated, applicationACLs));
-
-              assignedRequests.add(allocated, assigned.attemptID);
-
-              if (LOG.isDebugEnabled()) {
-                LOG.info("Assigned container (" + allocated + ") "
-                    + " to task " + assigned.attemptID + " on node "
-                    + allocated.getNodeId().toString());
-              }
-            }
-            else {
-              //not assigned to any request, release the container
-              LOG.info("Releasing unassigned and invalid container " 
-                  + allocated + ". RM has gone crazy, someone go look!"
-                  + " Hey RM, if you are so rich, go donate to non-profits!");
-            }
+            LOG.info("Could not map allocated container to a valid request."
+                + " Releasing allocated container " + allocated);
           }
+          
+          // release container if we could not assign it 
+          containerNotAssigned(allocated);
+          it.remove();
+          continue;
         }
-        
-        // release container if it was blacklisted 
-        // or if we could not assign it 
-        if (blackListed || assigned == null) {
-          containersReleased++;
-          release(allocated.getId());
-        }
+      }
+
+      assignContainers(allocatedContainers);
+       
+      // release container if we could not assign it 
+      it = allocatedContainers.iterator();
+      while (it.hasNext()) {
+        Container allocated = it.next();
+        LOG.info("Releasing unassigned and invalid container " 
+            + allocated + ". RM may have assignment issues");
+        containerNotAssigned(allocated);
+      }
+    }
+    
+    @SuppressWarnings("unchecked")
+    private void containerAssigned(Container allocated, 
+                                    ContainerRequest assigned) {
+      // Update resource requests
+      decContainerReq(assigned);
+
+      // send the container-assigned event to task attempt
+      eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+          assigned.attemptID, allocated, applicationACLs));
+
+      assignedRequests.add(allocated, assigned.attemptID);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Assigned container (" + allocated + ") "
+            + " to task " + assigned.attemptID + " on node "
+            + allocated.getNodeId().toString());
       }
     }
     
-    private ContainerRequest assign(Container allocated) {
+    private void containerNotAssigned(Container allocated) {
+      containersReleased++;
+      release(allocated.getId());      
+    }
+    
+    private ContainerRequest assignWithoutLocality(Container allocated) {
       ContainerRequest assigned = null;
       
       Priority priority = allocated.getPriority();
@@ -877,18 +890,24 @@ public class RMContainerAllocator extend
           LOG.debug("Assigning container " + allocated + " to reduce");
         }
         assigned = assignToReduce(allocated);
-      } else if (PRIORITY_MAP.equals(priority)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Assigning container " + allocated + " to map");
-        }
-        assigned = assignToMap(allocated);
-      } else {
-        LOG.warn("Container allocated at unwanted priority: " + priority + 
-            ". Returning to RM...");
       }
         
       return assigned;
     }
+        
+    private void assignContainers(List<Container> allocatedContainers) {
+      Iterator<Container> it = allocatedContainers.iterator();
+      while (it.hasNext()) {
+        Container allocated = it.next();
+        ContainerRequest assigned = assignWithoutLocality(allocated);
+        if (assigned != null) {
+          containerAssigned(allocated, assigned);
+          it.remove();
+        }
+      }
+
+      assignMapsWithLocality(allocatedContainers);
+    }
     
     private ContainerRequest getContainerReqToReplace(Container allocated) {
       LOG.info("Finding containerReq for allocated container: " + allocated);
@@ -959,11 +978,15 @@ public class RMContainerAllocator extend
     }
     
     @SuppressWarnings("unchecked")
-    private ContainerRequest assignToMap(Container allocated) {
-    //try to assign to maps if present 
-      //first by host, then by rack, followed by *
-      ContainerRequest assigned = null;
-      while (assigned == null && maps.size() > 0) {
+    private void assignMapsWithLocality(List<Container> allocatedContainers) {
+      // try to assign to all nodes first to match node local
+      Iterator<Container> it = allocatedContainers.iterator();
+      while(it.hasNext() && maps.size() > 0){
+        Container allocated = it.next();        
+        Priority priority = allocated.getPriority();
+        assert PRIORITY_MAP.equals(priority);
+        // "if (maps.containsKey(tId))" below should be almost always true.
+        // hence this while loop would almost always have O(1) complexity
         String host = allocated.getNodeId().getHost();
         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
         while (list != null && list.size() > 0) {
@@ -972,7 +995,9 @@ public class RMContainerAllocator extend
           }
           TaskAttemptId tId = list.removeFirst();
           if (maps.containsKey(tId)) {
-            assigned = maps.remove(tId);
+            ContainerRequest assigned = maps.remove(tId);
+            containerAssigned(allocated, assigned);
+            it.remove();
             JobCounterUpdateEvent jce =
               new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
             jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
@@ -984,39 +1009,56 @@ public class RMContainerAllocator extend
             break;
           }
         }
-        if (assigned == null) {
-          String rack = RackResolver.resolve(host).getNetworkLocation();
-          list = mapsRackMapping.get(rack);
-          while (list != null && list.size() > 0) {
-            TaskAttemptId tId = list.removeFirst();
-            if (maps.containsKey(tId)) {
-              assigned = maps.remove(tId);
-              JobCounterUpdateEvent jce =
-                new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
-              jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
-              eventHandler.handle(jce);
-              rackLocalAssigned++;
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("Assigned based on rack match " + rack);
-              }
-              break;
-            }
-          }
-          if (assigned == null && maps.size() > 0) {
-            TaskAttemptId tId = maps.keySet().iterator().next();
-            assigned = maps.remove(tId);
+      }
+      
+      // try to match all rack local
+      it = allocatedContainers.iterator();
+      while(it.hasNext() && maps.size() > 0){
+        Container allocated = it.next();
+        Priority priority = allocated.getPriority();
+        assert PRIORITY_MAP.equals(priority);
+        // "if (maps.containsKey(tId))" below should be almost always true.
+        // hence this while loop would almost always have O(1) complexity
+        String host = allocated.getNodeId().getHost();
+        String rack = RackResolver.resolve(host).getNetworkLocation();
+        LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+        while (list != null && list.size() > 0) {
+          TaskAttemptId tId = list.removeFirst();
+          if (maps.containsKey(tId)) {
+            ContainerRequest assigned = maps.remove(tId);
+            containerAssigned(allocated, assigned);
+            it.remove();
             JobCounterUpdateEvent jce =
               new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
-            jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+            jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
             eventHandler.handle(jce);
+            rackLocalAssigned++;
             if (LOG.isDebugEnabled()) {
-              LOG.debug("Assigned based on * match");
+              LOG.debug("Assigned based on rack match " + rack);
             }
             break;
           }
         }
       }
-      return assigned;
+      
+      // assign remaining
+      it = allocatedContainers.iterator();
+      while(it.hasNext() && maps.size() > 0){
+        Container allocated = it.next();
+        Priority priority = allocated.getPriority();
+        assert PRIORITY_MAP.equals(priority);
+        TaskAttemptId tId = maps.keySet().iterator().next();
+        ContainerRequest assigned = maps.remove(tId);
+        containerAssigned(allocated, assigned);
+        it.remove();
+        JobCounterUpdateEvent jce =
+          new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+        jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+        eventHandler.handle(jce);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Assigned based on * match");
+        }
+      }
     }
   }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Sat Feb 16 01:12:07 2013
@@ -72,7 +72,10 @@ public abstract class RMContainerRequest
   remoteRequestsTable =
       new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
 
-  private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
+  // use custom comparator to make sure ResourceRequest objects differing only in 
+  // numContainers dont end up as duplicates
+  private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+      new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
   private final Set<ContainerId> release = new TreeSet<ContainerId>(); 
 
   private boolean nodeBlacklistingEnabled;
@@ -235,7 +238,7 @@ public abstract class RMContainerRequest
               ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
               zeroedRequest.setNumContainers(0);
               // to be sent to RM on next heartbeat
-              ask.add(zeroedRequest);
+              addResourceRequestToAsk(zeroedRequest);
             }
           }
           // if all requests were still in ask queue
@@ -320,7 +323,7 @@ public abstract class RMContainerRequest
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
 
     // Note this down for next interaction with ResourceManager
-    ask.add(remoteRequest);
+    addResourceRequestToAsk(remoteRequest);
     if (LOG.isDebugEnabled()) {
       LOG.debug("addResourceRequest:" + " applicationId="
           + applicationId.getId() + " priority=" + priority.getPriority()
@@ -353,7 +356,12 @@ public abstract class RMContainerRequest
           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
     }
 
-    remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+    if(remoteRequest.getNumContainers() > 0) {
+      // based on blacklisting comments above we can end up decrementing more 
+      // than requested. so guard for that.
+      remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+    }
+    
     if (remoteRequest.getNumContainers() == 0) {
       reqMap.remove(capability);
       if (reqMap.size() == 0) {
@@ -362,13 +370,12 @@ public abstract class RMContainerRequest
       if (remoteRequests.size() == 0) {
         remoteRequestsTable.remove(priority);
       }
-      //remove from ask if it may have
-      ask.remove(remoteRequest);
-    } else {
-      ask.add(remoteRequest);//this will override the request if ask doesn't
-      //already have it.
     }
 
+    // send the updated resource request to RM
+    // send 0 container count requests also to cancel previous requests
+    addResourceRequestToAsk(remoteRequest);
+
     if (LOG.isDebugEnabled()) {
       LOG.info("AFTER decResourceRequest:" + " applicationId="
           + applicationId.getId() + " priority=" + priority.getPriority()
@@ -376,6 +383,16 @@ public abstract class RMContainerRequest
           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
     }
   }
+  
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // because objects inside the resource map can be deleted ask can end up 
+    // containing an object that matches new resource object but with different
+    // numContainers. So exisintg values must be replaced explicitly
+    if(ask.contains(remoteRequest)) {
+      ask.remove(remoteRequest);
+    }
+    ask.add(remoteRequest);    
+  }
 
   protected void release(ContainerId containerId) {
     release.add(containerId);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/TaskPage.java Sat Feb 16 01:12:07 2013
@@ -27,18 +27,11 @@ import static org.apache.hadoop.yarn.web
 
 import java.util.Collection;
 
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskAttemptInfo;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.webapp.SubView;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 
 import com.google.inject.Inject;
@@ -60,7 +53,7 @@ public class TaskPage extends AppView {
           h2($(TITLE));
         return;
       }
-      TBODY<TABLE<Hamlet>> tbody = html.
+      html.
       table("#attempts").
         thead().
           tr().
@@ -72,49 +65,46 @@ public class TaskPage extends AppView {
             th(".tsh", "Started").
             th(".tsh", "Finished").
             th(".tsh", "Elapsed").
-            th(".note", "Note")._()._().
-        tbody();
+            th(".note", "Note")._()._();
+      // Write all the data into a JavaScript array of arrays for JQuery
+      // DataTables to display
+      StringBuilder attemptsTableData = new StringBuilder("[\n");
+
       for (TaskAttempt attempt : getTaskAttempts()) {
         TaskAttemptInfo ta = new TaskAttemptInfo(attempt, true);
-        String taid = ta.getId();
         String progress = percent(ta.getProgress() / 100);
-        ContainerId containerId = ta.getAssignedContainerId();
 
         String nodeHttpAddr = ta.getNode();
-        long startTime = ta.getStartTime();
-        long finishTime = ta.getFinishTime();
-        long elapsed = ta.getElapsedTime();
         String diag = ta.getNote() == null ? "" : ta.getNote();
-        TR<TBODY<TABLE<Hamlet>>> row = tbody.tr();
-        TD<TR<TBODY<TABLE<Hamlet>>>> nodeTd = row.
-          td(".id", taid).
-          td(".progress", progress).
-          td(".state", ta.getState()).td();
-        if (nodeHttpAddr == null) {
-          nodeTd._("N/A");
-        } else {
-          nodeTd.
-            a(".nodelink", url(HttpConfig.getSchemePrefix(),
-                               nodeHttpAddr), nodeHttpAddr);
-        }
-        nodeTd._();
-        if (containerId != null) {
-          String containerIdStr = ta.getAssignedContainerIdStr();
-          row.td().
-              a(".logslink", url(HttpConfig.getSchemePrefix(),
-              nodeHttpAddr, "node", "containerlogs",
-              containerIdStr, app.getJob().getUserName()), "logs")._();
-        } else {
-          row.td()._("N/A")._();
-        }
-
-        row.
-          td(".ts", Times.format(startTime)).
-          td(".ts", Times.format(finishTime)).
-          td(".dt", StringUtils.formatTime(elapsed)).
-          td(".note", diag)._();
+        attemptsTableData.append("[\"")
+        .append(ta.getId()).append("\",\"")
+        .append(progress).append("\",\"")
+        .append(ta.getState().toString()).append("\",\"")
+
+        .append(nodeHttpAddr == null ? "N/A" :
+          "<a class='nodelink' href='" + HttpConfig.getSchemePrefix() + nodeHttpAddr + "'>"
+          + nodeHttpAddr + "</a>")
+        .append("\",\"")
+
+        .append(ta.getAssignedContainerId() == null ? "N/A" :
+          "<a class='logslink' href='" + url(HttpConfig.getSchemePrefix(), nodeHttpAddr, "node"
+            , "containerlogs", ta.getAssignedContainerIdStr(), app.getJob()
+            .getUserName()) + "'>logs</a>")
+          .append("\",\"")
+
+        .append(ta.getStartTime()).append("\",\"")
+        .append(ta.getFinishTime()).append("\",\"")
+        .append(ta.getElapsedTime()).append("\",\"")
+        .append(StringEscapeUtils.escapeJavaScript(StringEscapeUtils.escapeHtml(
+          diag))).append("\"],\n");
+      }
+      //Remove the last comma and close off the array of arrays
+      if(attemptsTableData.charAt(attemptsTableData.length() - 2) == ',') {
+        attemptsTableData.delete(attemptsTableData.length()-2, attemptsTableData.length()-1);
       }
-      tbody._()._();
+      attemptsTableData.append("]");
+      html.script().$type("text/javascript").
+      _("var attemptsTableData=" + attemptsTableData)._();
     }
 
     protected boolean isValidRequest() {
@@ -140,9 +130,24 @@ public class TaskPage extends AppView {
   }
 
   private String attemptsTableInit() {
-    return tableInit().
-        // Sort by id upon page load
-        append(", aaSorting: [[0, 'asc']]").
-        append("}").toString();
+    return tableInit()
+    .append(", 'aaData': attemptsTableData")
+    .append(", bDeferRender: true")
+    .append(", bProcessing: true")
+    .append("\n,aoColumnDefs:[\n")
+
+    //logs column should not filterable (it includes container ID which may pollute searches)
+    .append("\n{'aTargets': [ 4 ]")
+    .append(", 'bSearchable': false }")
+
+    .append("\n, {'sType':'numeric', 'aTargets': [ 5, 6")
+    .append(" ], 'mRender': renderHadoopDate }")
+
+    .append("\n, {'sType':'numeric', 'aTargets': [ 7")
+    .append(" ], 'mRender': renderHadoopElapsedTime }]")
+
+    // Sort by id upon page load
+    .append("\n, aaSorting: [[0, 'asc']]")
+    .append("}").toString();
   }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Sat Feb 16 01:12:07 2013
@@ -167,6 +167,7 @@ public class TestRMContainerAllocator {
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
     dispatcher.await();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+    Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
 
     // send another request with different resource and priority
     ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
@@ -178,7 +179,8 @@ public class TestRMContainerAllocator {
     assigned = allocator.schedule();
     dispatcher.await();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-
+    Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
+    
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
@@ -187,8 +189,100 @@ public class TestRMContainerAllocator {
 
     assigned = allocator.schedule();
     dispatcher.await();
+    Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
     checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
         assigned, false);
+    
+    // check that the assigned container requests are cancelled
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());    
+  }
+  
+  @Test 
+  public void testMapNodeLocality() throws Exception {
+    // test checks that ordering of allocated containers list from the RM does 
+    // not affect the map->container assignment done by the AM. If there is a 
+    // node local container available for a map then it should be assigned to 
+    // that container and not a rack-local container that happened to be seen 
+    // earlier in the allocated containers list from the RM.
+    // Regression test for MAPREDUCE-4893
+    LOG.info("Running testMapNodeLocality");
+
+    Configuration conf = new Configuration();
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, 
+            0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps 
+    rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
+    dispatcher.await();
+
+    // create the container requests for maps
+    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event1);
+    ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event2);
+    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+        new String[] { "h2" });
+    allocator.sendRequest(event3);
+
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // update resources in scheduler
+    // Node heartbeat from rack-local first. This makes node h3 the first in the
+    // list of allocated containers but it should not be assigned to task1.
+    nodeManager3.nodeHeartbeat(true);
+    // Node heartbeat from node-local next. This allocates 2 node local 
+    // containers for task1 and task2. These should be matched with those tasks.
+    nodeManager1.nodeHeartbeat(true);
+    dispatcher.await();
+
+    assigned = allocator.schedule();
+    dispatcher.await();
+    checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+        assigned, false);
+    // remove the rack-local assignment that should have happened for task3
+    for(TaskAttemptContainerAssignedEvent event : assigned) {
+      if(event.getTaskAttemptID().equals(event3.getAttemptID())) {
+        assigned.remove(event);
+        Assert.assertTrue(
+                    event.getContainer().getNodeId().getHost().equals("h3"));
+        break;
+      }
+    }
+    checkAssignments(new ContainerRequestEvent[] { event1, event2},
+        assigned, true);
   }
 
   @Test
@@ -336,7 +430,7 @@ public class TestRMContainerAllocator {
   }
 
   private static class MyResourceManager extends MockRM {
-
+    
     public MyResourceManager(Configuration conf) {
       super(conf);
     }
@@ -360,6 +454,10 @@ public class TestRMContainerAllocator {
     protected ResourceScheduler createScheduler() {
       return new MyFifoScheduler(this.getRMContext());
     }
+    
+    MyFifoScheduler getMyFifoScheduler() {
+      return (MyFifoScheduler) scheduler;
+    }
   }
 
   @Test
@@ -1108,7 +1206,9 @@ public class TestRMContainerAllocator {
         assert (false);
       }
     }
-
+    
+    List<ResourceRequest> lastAsk = null;
+    
     // override this to copy the objects otherwise FifoScheduler updates the
     // numContainers in same objects as kept by RMContainerAllocator
     @Override
@@ -1122,6 +1222,7 @@ public class TestRMContainerAllocator {
             .getNumContainers());
         askCopy.add(reqCopy);
       }
+      lastAsk = ask;
       return super.allocate(applicationAttemptId, askCopy, release);
     }
   }
@@ -1202,7 +1303,7 @@ public class TestRMContainerAllocator {
     if (checkHostMatch) {
       Assert.assertTrue("Not assigned to requested host", Arrays.asList(
           request.getHosts()).contains(
-          assigned.getContainer().getNodeId().toString()));
+          assigned.getContainer().getNodeId().getHost()));
     }
   }
 

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java Sat Feb 16 01:12:07 2013
@@ -50,11 +50,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
@@ -734,12 +738,173 @@ public class TestRecovery {
     app.verifyCompleted();
     validateOutput();
   }
-  
+
+  /**
+   * AM with 2 maps and 1 reduce. For 1st map, one attempt fails, one attempt
+   * completely disappears because of failed launch, one attempt gets killed and
+   * one attempt succeeds. AM crashes after the first tasks finishes and
+   * recovers completely and succeeds in the second generation.
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void testSpeculative() throws Exception {
+
+    int runCount = 0;
+    long am1StartTimeEst = System.currentTimeMillis();
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), true, ++runCount);
+    Configuration conf = new Configuration();
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    long jobStartTime = job.getReport().getStartTime();
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    // Launch a Speculative Task for the first Task
+    app.getContext().getEventHandler().handle(
+        new TaskEvent(mapTask1.getID(), TaskEventType.T_ADD_SPEC_ATTEMPT));
+    int timeOut = 0;
+    while (mapTask1.getAttempts().size() != 2 && timeOut++ < 10) {
+      Thread.sleep(1000);
+      LOG.info("Waiting for next attempt to start");
+    }
+    Iterator<TaskAttempt> t1it = mapTask1.getAttempts().values().iterator();
+    TaskAttempt task1Attempt1 = t1it.next();
+    TaskAttempt task1Attempt2 = t1it.next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+
+    ContainerId t1a2contId = task1Attempt2.getAssignedContainerID();
+
+    LOG.info(t1a2contId.toString());
+    LOG.info(task1Attempt1.getID().toString());
+    LOG.info(task1Attempt2.getID().toString());
+
+    // Launch container for speculative attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptContainerLaunchedEvent(task1Attempt2.getID(), runCount));
+
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
+    app.waitForState(task1Attempt2, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.RUNNING, reduceTask.getReport().getTaskState());
+
+    //send the done signal to the map 1 attempt 1
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task1Attempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(task1Attempt1, TaskAttemptState.SUCCEEDED);
+
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    long task1StartTime = mapTask1.getReport().getStartTime();
+    long task1FinishTime = mapTask1.getReport().getFinishTime();
+
+    //stop the app
+    app.stop();
+
+    //rerun
+    //in rerun the 1st map will be recovered from previous run
+    long am2StartTimeEst = System.currentTimeMillis();
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean("mapred.mapper.new-api", true);
+    conf.setBoolean("mapred.reducer.new-api", true);
+    conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("No of tasks not correct",
+       3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    reduceTask = it.next();
+
+    // first map will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+
+    //send the done signal to the 2nd map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    //wait to get it completed
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    //wait for reduce to be running before sending done
+    app.waitForState(reduceTask, TaskState.RUNNING);
+
+    //send the done signal to the reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    Assert.assertEquals("Job Start time not correct",
+        jobStartTime, job.getReport().getStartTime());
+    Assert.assertEquals("Task Start time not correct",
+        task1StartTime, mapTask1.getReport().getStartTime());
+    Assert.assertEquals("Task Finish time not correct",
+        task1FinishTime, mapTask1.getReport().getFinishTime());
+    Assert.assertEquals(2, job.getAMInfos().size());
+    int attemptNum = 1;
+    // Verify AMInfo
+    for (AMInfo amInfo : job.getAMInfos()) {
+      Assert.assertEquals(attemptNum++, amInfo.getAppAttemptId()
+          .getAttemptId());
+      Assert.assertEquals(amInfo.getAppAttemptId(), amInfo.getContainerId()
+          .getApplicationAttemptId());
+      Assert.assertEquals(MRApp.NM_HOST, amInfo.getNodeManagerHost());
+      Assert.assertEquals(MRApp.NM_PORT, amInfo.getNodeManagerPort());
+      Assert.assertEquals(MRApp.NM_HTTP_PORT, amInfo.getNodeManagerHttpPort());
+    }
+    long am1StartTimeReal = job.getAMInfos().get(0).getStartTime();
+    long am2StartTimeReal = job.getAMInfos().get(1).getStartTime();
+    Assert.assertTrue(am1StartTimeReal >= am1StartTimeEst
+        && am1StartTimeReal <= am2StartTimeEst);
+    Assert.assertTrue(am2StartTimeReal >= am2StartTimeEst
+        && am2StartTimeReal <= System.currentTimeMillis());
+
+  }
+
   private void writeBadOutput(TaskAttempt attempt, Configuration conf)
   throws Exception {
   TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, 
       TypeConverter.fromYarn(attempt.getID()));
-  
+ 
   TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
   RecordWriter theRecordWriter = theOutputFormat
       .getRecordWriter(tContext);

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java Sat Feb 16 01:12:07 2013
@@ -35,6 +35,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.TaskCounter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
@@ -52,7 +55,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
@@ -60,7 +62,7 @@ import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
 import org.junit.Before;
@@ -143,6 +145,7 @@ public class TestTaskImpl {
     private float progress = 0;
     private TaskAttemptState state = TaskAttemptState.NEW;
     private TaskType taskType;
+    private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS;
 
     public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler,
         TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
@@ -178,7 +181,15 @@ public class TestTaskImpl {
     public TaskAttemptState getState() {
       return state;
     }
-    
+
+    @Override
+    public Counters getCounters() {
+      return attemptCounters;
+    }
+
+    public void setCounters(Counters counters) {
+      attemptCounters = counters;
+    }
   }
   
   private class MockTask extends Task {
@@ -687,4 +698,49 @@ public class TestTaskImpl {
         TaskEventType.T_ATTEMPT_KILLED));
     assertEquals(TaskState.FAILED, mockTask.getState());
   }
+
+  @Test
+  public void testCountersWithSpeculation() {
+    mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),
+        remoteJobConfFile, conf, taskAttemptListener, jobToken,
+        credentials, clock,
+        completedTasksFromPreviousRun, startCount,
+        metrics, appContext, TaskType.MAP) {
+          @Override
+          protected int getMaxAttempts() {
+            return 1;
+          }
+    };
+    TaskId taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+    MockTaskAttemptImpl baseAttempt = getLastAttempt();
+
+    // add a speculative attempt
+    mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    launchTaskAttempt(getLastAttempt().getAttemptId());
+    updateLastAttemptState(TaskAttemptState.RUNNING);
+    MockTaskAttemptImpl specAttempt = getLastAttempt();
+    assertEquals(2, taskAttempts.size());
+
+    Counters specAttemptCounters = new Counters();
+    Counter cpuCounter = specAttemptCounters.findCounter(
+        TaskCounter.CPU_MILLISECONDS);
+    cpuCounter.setValue(1000);
+    specAttempt.setCounters(specAttemptCounters);
+
+    // have the spec attempt succeed but second attempt at 1.0 progress as well
+    commitTaskAttempt(specAttempt.getAttemptId());
+    specAttempt.setProgress(1.0f);
+    specAttempt.setState(TaskAttemptState.SUCCEEDED);
+    mockTask.handle(new TaskTAttemptEvent(specAttempt.getAttemptId(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+    assertEquals(TaskState.SUCCEEDED, mockTask.getState());
+    baseAttempt.setProgress(1.0f);
+
+    Counters taskCounters = mockTask.getCounters();
+    assertEquals("wrong counters for task", specAttemptCounters, taskCounters);
+  }
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/pom.xml Sat Feb 16 01:12:07 2013
@@ -54,63 +54,30 @@
   <build>
     <plugins>
       <plugin>
-        <artifactId>maven-antrun-plugin</artifactId>
+        <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-maven-plugins</artifactId>
         <executions>
           <execution>
-            <id>create-protobuf-generated-sources-directory</id>
-            <phase>initialize</phase>
-            <configuration>
-              <target>
-                <mkdir dir="target/generated-sources/proto" />
-              </target>
-            </configuration>
-            <goals>
-              <goal>run</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>exec-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>generate-sources</id>
-            <phase>generate-sources</phase>
-            <configuration>
-              <executable>protoc</executable>
-              <arguments>
-                <argument>-I../../../hadoop-common-project/hadoop-common/src/main/proto/</argument>
-                <argument>-I../../../hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/</argument>
-                <argument>-Isrc/main/proto/</argument>
-                <argument>--java_out=target/generated-sources/proto</argument>
-                <argument>src/main/proto/mr_protos.proto</argument>
-                <argument>src/main/proto/mr_service_protos.proto</argument>
-                <argument>src/main/proto/MRClientProtocol.proto</argument>
-              </arguments>
-            </configuration>
-            <goals>
-              <goal>exec</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
-
-      <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>add-source</id>
+            <id>compile-protoc</id>
             <phase>generate-sources</phase>
             <goals>
-              <goal>add-source</goal>
+              <goal>protoc</goal>
             </goals>
             <configuration>
-              <sources>
-                <source>target/generated-sources/proto</source>
-              </sources>
+              <imports>
+                <param>${basedir}/../../../hadoop-common-project/hadoop-common/src/main/proto</param>
+                <param>${basedir}/../../../hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto</param>
+                <param>${basedir}/src/main/proto</param>
+              </imports>
+              <source>
+                <directory>${basedir}/src/main/proto</directory>
+                <includes>
+                  <include>mr_protos.proto</include>
+                  <include>mr_service_protos.proto</include>
+                  <include>MRClientProtocol.proto</include>
+                </includes>
+              </source>
+              <output>${project.build.directory}/generated-sources/java</output>
             </configuration>
           </execution>
         </executions>

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalClientProtocolProvider.java Sat Feb 16 01:12:07 2013
@@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocol;
 import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider;
-import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 
 @InterfaceAudience.Private
 public class LocalClientProtocolProvider extends ClientProtocolProvider {
@@ -38,16 +37,11 @@ public class LocalClientProtocolProvider
     if (!MRConfig.LOCAL_FRAMEWORK_NAME.equals(framework)) {
       return null;
     }
-    String tracker = conf.get(JTConfig.JT_IPC_ADDRESS, "local");
-    if ("local".equals(tracker)) {
+    if (conf.get("mapreduce.job.maps") == null) {
       conf.setInt("mapreduce.job.maps", 1);
-      return new LocalJobRunner(conf);
-    } else {
-
-      throw new IOException("Invalid \"" + JTConfig.JT_IPC_ADDRESS
-          + "\" configuration value for LocalJobRunner : \""
-          + tracker + "\"");
     }
+
+    return new LocalJobRunner(conf);
   }
 
   @Override

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/pom.xml Sat Feb 16 01:12:07 2013
@@ -68,24 +68,6 @@
         </executions>
       </plugin>
       <plugin>
-        <groupId>org.codehaus.mojo</groupId>
-        <artifactId>build-helper-maven-plugin</artifactId>
-        <executions>
-          <execution>
-            <id>add-source</id>
-            <phase>generate-sources</phase>
-            <goals>
-              <goal>add-source</goal>
-            </goals>
-            <configuration>
-              <sources>
-                <source>target/generated-sources/avro</source>
-              </sources>
-            </configuration>
-          </execution>
-        </executions>
-      </plugin>
-      <plugin>
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-antrun-plugin</artifactId>
         <executions>

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Counters.java Sat Feb 16 01:12:07 2013
@@ -230,6 +230,10 @@ public class Counters
   public static class Group implements CounterGroupBase<Counter> {
     private CounterGroupBase<Counter> realGroup;
     
+    protected Group() {
+      realGroup = null;
+    }
+    
     Group(GenericGroup group) {
       this.realGroup = group;
     }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java Sat Feb 16 01:12:07 2013
@@ -92,7 +92,7 @@ public class FileOutputCommitter extends
   }
 
   @Private
-  Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
+  public Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
     Path out = getOutputPath(context);
     return out == null ? null : getTaskAttemptPath(context, out);
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputFormat.java Sat Feb 16 01:12:07 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.text.NumberFormat;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -152,8 +153,8 @@ public abstract class FileOutputFormat<K
    * @param outputDir the {@link Path} of the output directory 
    * for the map-reduce job.
    */
-  
-  static void setWorkOutputPath(JobConf conf, Path outputDir) {
+  @Private
+  public static void setWorkOutputPath(JobConf conf, Path outputDir) {
     outputDir = new Path(conf.getWorkingDirectory(), outputDir);
     conf.set(JobContext.TASK_OUTPUT_DIR, outputDir.toString());
   }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobConf.java Sat Feb 16 01:12:07 2013
@@ -28,6 +28,7 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -419,7 +420,8 @@ public class JobConf extends Configurati
     return credentials;
   }
   
-  void setCredentials(Credentials credentials) {
+  @Private
+  public void setCredentials(Credentials credentials) {
     this.credentials = credentials;
   }
   

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/pipes/Submitter.java Sat Feb 16 01:12:07 2013
@@ -58,6 +58,7 @@ import org.apache.hadoop.mapred.lib.Lazy
 import org.apache.hadoop.mapred.lib.NullOutputFormat;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.Tool;
 
@@ -515,7 +516,7 @@ public class Submitter extends Configure
    */
   public static void main(String[] args) throws Exception {
     int exitCode =  new Submitter().run(args);
-    System.exit(exitCode);
+    ExitUtil.terminate(exitCode);
   }
 
 }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FileSystemCounterGroup.java Sat Feb 16 01:12:07 2013
@@ -35,6 +35,7 @@ import com.google.common.collect.Iterato
 import com.google.common.collect.Maps;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.FileSystemCounter;
@@ -72,6 +73,16 @@ public abstract class FileSystemCounterG
       this.scheme = scheme;
       key = ref;
     }
+    
+    @Private
+    public String getScheme() {
+      return scheme;
+    }
+    
+    @Private
+    public FileSystemCounter getFileSystemCounter() {
+      return key;
+    }
 
     @Override
     public String getName() {

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/counters/FrameworkCounterGroup.java Sat Feb 16 01:12:07 2013
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.util.ResourceBundles;
@@ -66,7 +67,17 @@ public abstract class FrameworkCounterGr
       key = ref;
       this.groupName = groupName;
     }
+    
+    @Private
+    public T getKey() {
+      return key;
+    }
 
+    @Private
+    public String getGroupName() {
+      return groupName;
+    }
+    
     @Override
     public String getName() {
       return key.name();

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java Sat Feb 16 01:12:07 2013
@@ -246,6 +246,7 @@ public class JobHistoryParser implements
     attemptInfo.state = StringInterner.weakIntern(event.getState());
     attemptInfo.counters = event.getCounters();
     attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
+    info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
   }
 
   private void handleReduceAttemptFinishedEvent
@@ -262,6 +263,7 @@ public class JobHistoryParser implements
     attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
     attemptInfo.port = event.getPort();
     attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
+    info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
   }
 
   private void handleMapAttemptFinishedEvent(MapAttemptFinishedEvent event) {
@@ -276,6 +278,7 @@ public class JobHistoryParser implements
     attemptInfo.hostname = StringInterner.weakIntern(event.getHostname());
     attemptInfo.port = event.getPort();
     attemptInfo.rackname = StringInterner.weakIntern(event.getRackName());
+    info.completedTaskAttemptsMap.put(event.getAttemptId(), attemptInfo);
   }
 
   private void handleTaskAttemptFailedEvent(
@@ -306,6 +309,7 @@ public class JobHistoryParser implements
         taskInfo.successfulAttemptId = null;
       }
     }
+    info.completedTaskAttemptsMap.put(event.getTaskAttemptId(), attemptInfo);
   }
 
   private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) {
@@ -443,6 +447,7 @@ public class JobHistoryParser implements
     Map<JobACL, AccessControlList> jobACLs;
     
     Map<TaskID, TaskInfo> tasksMap;
+    Map<TaskAttemptID, TaskAttemptInfo> completedTaskAttemptsMap;
     List<AMInfo> amInfos;
     AMInfo latestAmInfo;
     boolean uberized;
@@ -456,6 +461,7 @@ public class JobHistoryParser implements
       finishedMaps = finishedReduces = 0;
       username = jobname = jobConfPath = jobQueueName = "";
       tasksMap = new HashMap<TaskID, TaskInfo>();
+      completedTaskAttemptsMap = new HashMap<TaskAttemptID, TaskAttemptInfo>();
       jobACLs = new HashMap<JobACL, AccessControlList>();
       priority = JobPriority.NORMAL;
     }
@@ -530,6 +536,8 @@ public class JobHistoryParser implements
     public Counters getReduceCounters() { return reduceCounters; }
     /** @return the map of all tasks in this job */
     public Map<TaskID, TaskInfo> getAllTasks() { return tasksMap; }
+    /** @return the map of all completed task attempts in this job */
+    public Map<TaskAttemptID, TaskAttemptInfo> getAllCompletedTaskAttempts() { return completedTaskAttemptsMap; }
     /** @return the priority of this job */
     public String getPriority() { return priority.toString(); }
     public Map<JobACL, AccessControlList> getJobACLs() { return jobACLs; }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/MapAttemptFinishedEvent.java Sat Feb 16 01:12:07 2013
@@ -178,7 +178,7 @@ public class MapAttemptFinishedEvent  im
 
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(taskType.toString());
+    return taskType;
   }
   /** Get the task status */
   public String getTaskStatus() { return taskStatus.toString(); }

Modified: hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java?rev=1446832&r1=1446831&r2=1446832&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java (original)
+++ hadoop/common/branches/HDFS-347/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ReduceAttemptFinishedEvent.java Sat Feb 16 01:12:07 2013
@@ -176,11 +176,11 @@ public class ReduceAttemptFinishedEvent 
   public TaskID getTaskId() { return attemptId.getTaskID(); }
   /** Get the attempt id */
   public TaskAttemptID getAttemptId() {
-    return TaskAttemptID.forName(attemptId.toString());
+    return attemptId;
   }
   /** Get the task type */
   public TaskType getTaskType() {
-    return TaskType.valueOf(taskType.toString());
+    return taskType;
   }
   /** Get the task status */
   public String getTaskStatus() { return taskStatus.toString(); }



Mime
View raw message