hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1231834 [2/7] - in /hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project: ./ bin/ conf/ hadoop-mapreduce-client/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/ja...
Date Mon, 16 Jan 2012 04:24:34 GMT
Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskHeartbeatHandler.java Mon Jan 16 04:24:24 2012
@@ -18,13 +18,15 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
@@ -40,6 +42,7 @@ import org.apache.hadoop.yarn.service.Ab
  * not hear from it for a long time.
  * 
  */
+@SuppressWarnings({"unchecked", "rawtypes"})
 public class TaskHeartbeatHandler extends AbstractService {
 
   private static final Log LOG = LogFactory.getLog(TaskHeartbeatHandler.class);
@@ -48,24 +51,29 @@ public class TaskHeartbeatHandler extend
   //received from a task.
   private Thread lostTaskCheckerThread;
   private volatile boolean stopped;
-  private int taskTimeOut = 5*60*1000;//5 mins
+  private int taskTimeOut = 5 * 60 * 1000;// 5 mins
+  private int taskTimeOutCheckInterval = 30 * 1000; // 30 seconds.
 
   private final EventHandler eventHandler;
   private final Clock clock;
 
-  private Map<TaskAttemptId, Long> runningAttempts 
-    = new HashMap<TaskAttemptId, Long>();
+  private ConcurrentMap<TaskAttemptId, Long> runningAttempts;
 
-  public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock) {
+  public TaskHeartbeatHandler(EventHandler eventHandler, Clock clock,
+      int numThreads) {
     super("TaskHeartbeatHandler");
     this.eventHandler = eventHandler;
     this.clock = clock;
+    runningAttempts =
+      new ConcurrentHashMap<TaskAttemptId, Long>(16, 0.75f, numThreads);
   }
 
   @Override
   public void init(Configuration conf) {
-   super.init(conf);
-   taskTimeOut = conf.getInt("mapreduce.task.timeout", 5*60*1000);
+    super.init(conf);
+    taskTimeOut = conf.getInt(MRJobConfig.TASK_TIMEOUT, 5 * 60 * 1000);
+    taskTimeOutCheckInterval =
+        conf.getInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 30 * 1000);
   }
 
   @Override
@@ -83,18 +91,17 @@ public class TaskHeartbeatHandler extend
     super.stop();
   }
 
-  public synchronized void receivedPing(TaskAttemptId attemptID) {
-    //only put for the registered attempts
-    if (runningAttempts.containsKey(attemptID)) {
-      runningAttempts.put(attemptID, clock.getTime());
-    }
+  public void receivedPing(TaskAttemptId attemptID) {
+  //only put for the registered attempts
+    //TODO throw an exception if the task isn't registered.
+    runningAttempts.replace(attemptID, clock.getTime());
   }
 
-  public synchronized void register(TaskAttemptId attemptID) {
+  public void register(TaskAttemptId attemptID) {
     runningAttempts.put(attemptID, clock.getTime());
   }
 
-  public synchronized void unregister(TaskAttemptId attemptID) {
+  public void unregister(TaskAttemptId attemptID) {
     runningAttempts.remove(attemptID);
   }
 
@@ -103,36 +110,40 @@ public class TaskHeartbeatHandler extend
     @Override
     public void run() {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
-        synchronized (TaskHeartbeatHandler.this) {
-          Iterator<Map.Entry<TaskAttemptId, Long>> iterator = 
+        Iterator<Map.Entry<TaskAttemptId, Long>> iterator =
             runningAttempts.entrySet().iterator();
 
-          //avoid calculating current time everytime in loop
-          long currentTime = clock.getTime();
+        // avoid calculating current time everytime in loop
+        long currentTime = clock.getTime();
 
-          while (iterator.hasNext()) {
-            Map.Entry<TaskAttemptId, Long> entry = iterator.next();
-            if (currentTime > entry.getValue() + taskTimeOut) {
-              //task is lost, remove from the list and raise lost event
+        while (iterator.hasNext()) {
+          Map.Entry<TaskAttemptId, Long> entry = iterator.next();
+          if (currentTime > entry.getValue() + taskTimeOut) {
+
+            //In case the iterator isn't picking up the latest.
+            // Extra lookup outside of the iterator - but only if the task
+            // is considered to be timed out.
+            Long taskTime = runningAttempts.get(entry.getKey());
+            if (taskTime != null && currentTime > taskTime + taskTimeOut) {
+              // task is lost, remove from the list and raise lost event
               iterator.remove();
-              eventHandler.handle(
-                  new TaskAttemptDiagnosticsUpdateEvent(entry.getKey(),
-                      "AttemptID:" + entry.getKey().toString() + 
-                      " Timed out after " + taskTimeOut/1000 + " secs"));
-              eventHandler.handle(new TaskAttemptEvent(entry
-                  .getKey(), TaskAttemptEventType.TA_TIMED_OUT));
+              eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
+                  .getKey(), "AttemptID:" + entry.getKey().toString()
+                  + " Timed out after " + taskTimeOut / 1000 + " secs"));
+              eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
+                  TaskAttemptEventType.TA_TIMED_OUT));
             }
+
           }
         }
         try {
-          Thread.sleep(taskTimeOut);
+          Thread.sleep(taskTimeOutCheckInterval);
         } catch (InterruptedException e) {
           LOG.info("TaskHeartbeatHandler thread interrupted");
           break;
         }
       }
     }
-    
   }
 
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java Mon Jan 16 04:24:24 2012
@@ -31,11 +31,14 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.FailTaskAttemptResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetCountersResponse;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDiagnosticsResponse;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
@@ -223,7 +226,7 @@ public class MRClientService extends Abs
       Job job = verifyAndGetJob(jobId, false);
       GetCountersResponse response =
         recordFactory.newRecordInstance(GetCountersResponse.class);
-      response.setCounters(job.getCounters());
+      response.setCounters(TypeConverter.toYarn(job.getAllCounters()));
       return response;
     }
     
@@ -237,8 +240,7 @@ public class MRClientService extends Abs
       response.setJobReport(job.getReport());
       return response;
     }
-    
-    
+
     @Override
     public GetTaskAttemptReportResponse getTaskAttemptReport(
         GetTaskAttemptReportRequest request) throws YarnRemoteException {
@@ -356,6 +358,8 @@ public class MRClientService extends Abs
       return response;
     }
 
+    private final Object getTaskReportsLock = new Object();
+
     @Override
     public GetTaskReportsResponse getTaskReports(
         GetTaskReportsRequest request) throws YarnRemoteException {
@@ -366,13 +370,26 @@ public class MRClientService extends Abs
         recordFactory.newRecordInstance(GetTaskReportsResponse.class);
       
       Job job = verifyAndGetJob(jobId, false);
-      LOG.info("Getting task report for " + taskType + "   " + jobId);
       Collection<Task> tasks = job.getTasks(taskType).values();
-      LOG.info("Getting task report size " + tasks.size());
-      for (Task task : tasks) {
-        response.addTaskReport(task.getReport());
-	  }
+      LOG.info("Getting task report for " + taskType + "   " + jobId
+          + ". Report-size will be " + tasks.size());
+
+      // Take lock to allow only one call, otherwise heap will blow up because
+      // of counters in the report when there are multiple callers.
+      synchronized (getTaskReportsLock) {
+        for (Task task : tasks) {
+          response.addTaskReport(task.getReport());
+        }
+      }
+
       return response;
     }
+
+    @Override
+    public GetDelegationTokenResponse getDelegationToken(
+        GetDelegationTokenRequest request) throws YarnRemoteException {
+      throw RPCUtil.getRemoteException("MR AM not authorized to issue delegation" +
+      		" token");
+    }
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Job.java Mon Jan 16 04:24:24 2012
@@ -22,9 +22,9 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -44,7 +44,15 @@ public interface Job {
   String getName();
   JobState getState();
   JobReport getReport();
-  Counters getCounters();
+
+  /**
+   * Get all the counters of this job. This includes job-counters aggregated
+   * together with the counters of each task. This creates a clone of the
+   * Counters, so use this judiciously.  
+   * @return job-counters and aggregate task-counters
+   */
+  Counters getAllCounters();
+
   Map<TaskId,Task> getTasks();
   Map<TaskId,Task> getTasks(TaskType taskType);
   Task getTask(TaskId taskID);

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/Task.java Mon Jan 16 04:24:24 2012
@@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.Map;
 
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttempt.java Mon Jan 16 04:24:24 2012
@@ -20,7 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.List;
 
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptStatusUpdateEvent.java Mon Jan 16 04:24:24 2012
@@ -20,12 +20,11 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import java.util.List;
 
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
 
-
 public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
 
   private TaskAttemptStatus reportedTaskAttemptStatus;

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Jan 16 04:24:24 2012
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobACLsManager;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobACL;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -61,9 +62,6 @@ import org.apache.hadoop.mapreduce.split
 import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
 import org.apache.hadoop.mapreduce.task.JobContextImpl;
 import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
-import org.apache.hadoop.mapreduce.v2.api.records.Counter;
-import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -99,7 +97,6 @@ import org.apache.hadoop.yarn.Clock;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
 import org.apache.hadoop.yarn.state.SingleArcTransition;
@@ -109,10 +106,13 @@ import org.apache.hadoop.yarn.state.Stat
 /** Implementation of Job interface. Maintains the state machines of Job.
  * The read and write calls use ReadWriteLock for concurrency.
  */
-@SuppressWarnings({ "rawtypes", "deprecation" })
+@SuppressWarnings({ "rawtypes", "deprecation", "unchecked" })
 public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job, 
   EventHandler<JobEvent> {
 
+  private static final TaskAttemptCompletionEvent[]
+    EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS = new TaskAttemptCompletionEvent[0];
+
   private static final Log LOG = LogFactory.getLog(JobImpl.class);
 
   //The maximum fraction of fetch failures allowed for a map
@@ -152,7 +152,7 @@ public class JobImpl implements org.apac
 
   private boolean lazyTasksCopyNeeded = false;
   volatile Map<TaskId, Task> tasks = new LinkedHashMap<TaskId, Task>();
-  private Counters jobCounters = newCounters();
+  private Counters jobCounters = new Counters();
     // FIXME:  
     //
     // Can then replace task-level uber counters (MR-2424) with job-level ones
@@ -475,88 +475,29 @@ public class JobImpl implements org.apac
   }
 
   @Override
-  public Counters getCounters() {
-    Counters counters = newCounters();
+  public Counters getAllCounters() {
+    Counters counters = new Counters();
     readLock.lock();
     try {
-      incrAllCounters(counters, jobCounters);
+      counters.incrAllCounters(jobCounters);
       return incrTaskCounters(counters, tasks.values());
     } finally {
       readLock.unlock();
     }
   }
 
-  private Counters getTypeCounters(Set<TaskId> taskIds) {
-    Counters counters = newCounters();
-    for (TaskId taskId : taskIds) {
-      Task task = tasks.get(taskId);
-      incrAllCounters(counters, task.getCounters());
-    }
-    return counters;
-  }
-
-  private Counters getMapCounters() {
-    readLock.lock();
-    try {
-      return getTypeCounters(mapTasks);
-    } finally {
-      readLock.unlock();
-    }
-  }
-  
-  private Counters getReduceCounters() {
-    readLock.lock();
-    try {
-      return getTypeCounters(reduceTasks);
-    } finally {
-      readLock.unlock();
-    }
-  }
-  
-  public static Counters newCounters() {
-    Counters counters = RecordFactoryProvider.getRecordFactory(null)
-        .newRecordInstance(Counters.class);
-    return counters;
-  }
-
-  public static Counters incrTaskCounters(Counters counters,
-                                          Collection<Task> tasks) {
+  public static Counters incrTaskCounters(
+      Counters counters, Collection<Task> tasks) {
     for (Task task : tasks) {
-      incrAllCounters(counters, task.getCounters());
+      counters.incrAllCounters(task.getCounters());
     }
     return counters;
   }
 
-  public static void incrAllCounters(Counters counters, Counters other) {
-    if (other != null) {
-      for (CounterGroup otherGroup: other.getAllCounterGroups().values()) {
-        CounterGroup group = counters.getCounterGroup(otherGroup.getName());
-        if (group == null) {
-          group = RecordFactoryProvider.getRecordFactory(null)
-              .newRecordInstance(CounterGroup.class);
-          group.setName(otherGroup.getName());
-          counters.setCounterGroup(group.getName(), group);
-        }
-        group.setDisplayName(otherGroup.getDisplayName());
-        for (Counter otherCounter : otherGroup.getAllCounters().values()) {
-          Counter counter = group.getCounter(otherCounter.getName());
-          if (counter == null) {
-            counter = RecordFactoryProvider.getRecordFactory(null)
-                .newRecordInstance(Counter.class);
-            counter.setName(otherCounter.getName());
-            group.setCounter(counter.getName(), counter);
-          }
-          counter.setDisplayName(otherCounter.getDisplayName());
-          counter.setValue(counter.getValue() + otherCounter.getValue());
-        }
-      }
-    }
-  }
-
   @Override
   public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
       int fromEventId, int maxEvents) {
-    TaskAttemptCompletionEvent[] events = new TaskAttemptCompletionEvent[0];
+    TaskAttemptCompletionEvent[] events = EMPTY_TASK_ATTEMPT_COMPLETION_EVENTS;
     readLock.lock();
     try {
       if (taskAttemptCompletionEvents.size() > fromEventId) {
@@ -1204,13 +1145,24 @@ public class JobImpl implements org.apac
   // area. May need to create a new event type for this if JobFinished should 
   // not be generated for KilledJobs, etc.
   private static JobFinishedEvent createJobFinishedEvent(JobImpl job) {
+
+    Counters mapCounters = new Counters();
+    Counters reduceCounters = new Counters();
+    for (Task t : job.tasks.values()) {
+      Counters counters = t.getCounters();
+      switch (t.getType()) {
+        case MAP:     mapCounters.incrAllCounters(counters);     break;
+        case REDUCE:  reduceCounters.incrAllCounters(counters);  break;
+      }
+    }
+
     JobFinishedEvent jfe = new JobFinishedEvent(
         job.oldJobId, job.finishTime,
         job.succeededMapTaskCount, job.succeededReduceTaskCount,
         job.failedMapTaskCount, job.failedReduceTaskCount,
-        TypeConverter.fromYarn(job.getMapCounters()),
-        TypeConverter.fromYarn(job.getReduceCounters()),
-        TypeConverter.fromYarn(job.getCounters()));
+        mapCounters,
+        reduceCounters,
+        job.getAllCounters());
     return jfe;
   }
 
@@ -1450,7 +1402,8 @@ public class JobImpl implements org.apac
       JobCounterUpdateEvent jce = (JobCounterUpdateEvent) event;
       for (JobCounterUpdateEvent.CounterIncrementalUpdate ci : jce
           .getCounterUpdates()) {
-        job.jobCounters.incrCounter(ci.getCounterKey(), ci.getIncrementValue());
+        job.jobCounters.findCounter(ci.getCounterKey()).increment(
+          ci.getIncrementValue());
       }
     }
   }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Mon Jan 16 04:24:24 2012
@@ -47,6 +47,8 @@ import org.apache.hadoop.mapred.Task;
 import org.apache.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.hadoop.mapred.WrappedJvmID;
 import org.apache.hadoop.mapred.WrappedProgressSplitsBlock;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobCounter;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
@@ -60,8 +62,6 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.api.records.Counter;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.Phase;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
@@ -132,6 +132,7 @@ public abstract class TaskAttemptImpl im
     org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
       EventHandler<TaskAttemptEvent> {
 
+  static final Counters EMPTY_COUNTERS = new Counters();
   private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
   private static final long MEMORY_SPLITS_RESOLUTION = 1024; //TODO Make configurable?
   private static final int MAP_MEMORY_MB_DEFAULT = 1024;
@@ -846,7 +847,7 @@ public abstract class TaskAttemptImpl im
       result.setDiagnosticInfo(StringUtils.join(LINE_SEPARATOR, getDiagnostics()));
       result.setPhase(reportedStatus.phase);
       result.setStateString(reportedStatus.stateString);
-      result.setCounters(getCounters());
+      result.setCounters(TypeConverter.toYarn(getCounters()));
       result.setContainerId(this.getAssignedContainerID());
       result.setNodeManagerHost(trackerName);
       result.setNodeManagerHttpPort(httpPort);
@@ -877,7 +878,7 @@ public abstract class TaskAttemptImpl im
     try {
       Counters counters = reportedStatus.counters;
       if (counters == null) {
-        counters = recordFactory.newRecordInstance(Counters.class);
+        counters = EMPTY_COUNTERS;
 //        counters.groups = new HashMap<String, CounterGroup>();
       }
       return counters;
@@ -1031,22 +1032,21 @@ public abstract class TaskAttemptImpl im
             (int) (now - start));
       }
 
-      Counter cpuCounter = counters.getCounter(
-          TaskCounter.CPU_MILLISECONDS);
+      Counter cpuCounter = counters.findCounter(TaskCounter.CPU_MILLISECONDS);
       if (cpuCounter != null && cpuCounter.getValue() <= Integer.MAX_VALUE) {
         splitsBlock.getProgressCPUTime().extend(newProgress,
-            (int) cpuCounter.getValue());
+            (int) cpuCounter.getValue()); // long to int? TODO: FIX. Same below
       }
 
-      Counter virtualBytes = counters.getCounter(
-          TaskCounter.VIRTUAL_MEMORY_BYTES);
+      Counter virtualBytes = counters
+        .findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES);
       if (virtualBytes != null) {
         splitsBlock.getProgressVirtualMemoryKbytes().extend(newProgress,
             (int) (virtualBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
       }
 
-      Counter physicalBytes = counters.getCounter(
-          TaskCounter.PHYSICAL_MEMORY_BYTES);
+      Counter physicalBytes = counters
+        .findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES);
       if (physicalBytes != null) {
         splitsBlock.getProgressPhysicalMemoryKbytes().extend(newProgress,
             (int) (physicalBytes.getValue() / (MEMORY_SPLITS_RESOLUTION)));
@@ -1201,7 +1201,7 @@ public abstract class TaskAttemptImpl im
 
       // register it to TaskAttemptListener so that it can start monitoring it.
       taskAttempt.taskAttemptListener
-        .registerLaunchedTask(taskAttempt.attemptId);
+        .registerLaunchedTask(taskAttempt.attemptId, taskAttempt.jvmID);
       //TODO Resolve to host / IP in case of a local address.
       InetSocketAddress nodeHttpInetAddr =
           NetUtils.createSocketAddr(taskAttempt.nodeHttpAddress); // TODO:
@@ -1343,7 +1343,7 @@ public abstract class TaskAttemptImpl im
          this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
          this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
          this.reportedStatus.stateString,
-         TypeConverter.fromYarn(getCounters()),
+         getCounters(),
          getProgressSplitBlock().burst());
          eventHandler.handle(
            new JobHistoryEvent(attemptId.getTaskId().getJobId(), mfe));
@@ -1360,7 +1360,7 @@ public abstract class TaskAttemptImpl im
          this.containerNodeId == null ? -1 : this.containerNodeId.getPort(),
          this.nodeRackName == null ? "UNKNOWN" : this.nodeRackName,
          this.reportedStatus.stateString,
-         TypeConverter.fromYarn(getCounters()),
+         getCounters(),
          getProgressSplitBlock().burst());
          eventHandler.handle(
            new JobHistoryEvent(attemptId.getTaskId().getJobId(), rfe));
@@ -1498,8 +1498,8 @@ public abstract class TaskAttemptImpl im
     result.phase = Phase.STARTING;
     result.stateString = "NEW";
     result.taskState = TaskAttemptState.NEW;
-    Counters counters = recordFactory.newRecordInstance(Counters.class);
-//    counters.groups = new HashMap<String, CounterGroup>();
+    Counters counters = EMPTY_COUNTERS;
+    //    counters.groups = new HashMap<String, CounterGroup>();
     result.counters = counters;
   }
 

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Mon Jan 16 04:24:24 2012
@@ -33,6 +33,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -40,7 +41,6 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
 import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
@@ -329,7 +329,6 @@ public abstract class TaskImpl implement
       report.setFinishTime(getFinishTime());
       report.setTaskState(getState());
       report.setProgress(getProgress());
-      report.setCounters(getCounters());
 
       for (TaskAttempt attempt : attempts.values()) {
         if (TaskAttemptState.RUNNING.equals(attempt.getState())) {
@@ -346,6 +345,11 @@ public abstract class TaskImpl implement
           
         }
       }
+
+      // Add a copy of counters as the last step so that their lifetime on heap
+      // is as small as possible.
+      report.setCounters(TypeConverter.toYarn(getCounters()));
+
       return report;
     } finally {
       readLock.unlock();
@@ -361,7 +365,7 @@ public abstract class TaskImpl implement
       if (bestAttempt != null) {
         counters = bestAttempt.getCounters();
       } else {
-        counters = recordFactory.newRecordInstance(Counters.class);
+        counters = TaskAttemptImpl.EMPTY_COUNTERS;
 //        counters.groups = new HashMap<CharSequence, CounterGroup>();
       }
       return counters;
@@ -595,7 +599,7 @@ public abstract class TaskImpl implement
         task.getFinishTime(task.successfulAttempt),
         TypeConverter.fromYarn(task.taskId.getTaskType()),
         taskState.toString(),
-        TypeConverter.fromYarn(task.getCounters()));
+        task.getCounters());
     return tfe;
   }
   

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java Mon Jan 16 04:24:24 2012
@@ -26,6 +26,7 @@ import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -44,8 +45,6 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
@@ -75,16 +74,217 @@ public class ContainerLauncherImpl exten
 
   int nmTimeOut;
 
+  private ConcurrentHashMap<ContainerId, Container> containers = 
+    new ConcurrentHashMap<ContainerId, Container>(); 
   private AppContext context;
-  private ThreadPoolExecutor launcherPool;
-  private static final int INITIAL_POOL_SIZE = 10;
+  protected ThreadPoolExecutor launcherPool;
+  protected static final int INITIAL_POOL_SIZE = 10;
   private int limitOnPoolSize;
   private Thread eventHandlingThread;
-  private BlockingQueue<ContainerLauncherEvent> eventQueue =
+  protected BlockingQueue<ContainerLauncherEvent> eventQueue =
       new LinkedBlockingQueue<ContainerLauncherEvent>();
   final Timer commandTimer = new Timer(true);
   YarnRPC rpc;
 
+  private Container getContainer(ContainerId id) {
+    Container c = containers.get(id);
+    if(c == null) {
+      c = new Container();
+      Container old = containers.putIfAbsent(id, c);
+      if(old != null) {
+        c = old;
+      }
+    }
+    return c;
+  }
+  
+  private void removeContainerIfDone(ContainerId id) {
+    Container c = containers.get(id);
+    if(c != null && c.isCompletelyDone()) {
+      containers.remove(id);
+    }
+  }
+  
+  private static enum ContainerState {
+    PREP, FAILED, RUNNING, DONE, KILLED_BEFORE_LAUNCH
+  }
+
+  private class Container {
+    private ContainerState state;
+    
+    public Container() {
+      this.state = ContainerState.PREP;
+    }
+    
+    public synchronized boolean isCompletelyDone() {
+      return state == ContainerState.DONE || state == ContainerState.FAILED;
+    }
+    
+    @SuppressWarnings("unchecked")
+    public synchronized void launch(ContainerRemoteLaunchEvent event) {
+      TaskAttemptId taskAttemptID = event.getTaskAttemptID();
+      LOG.info("Launching " + taskAttemptID);
+      if(this.state == ContainerState.KILLED_BEFORE_LAUNCH) {
+        state = ContainerState.DONE;
+        sendContainerLaunchFailedMsg(taskAttemptID, 
+            "Container was killed before it was launched");
+        return;
+      }
+      CommandTimerTask timerTask = new CommandTimerTask(Thread
+          .currentThread(), event);
+      
+      final String containerManagerBindAddr = event.getContainerMgrAddress();
+      ContainerId containerID = event.getContainerID();
+      ContainerToken containerToken = event.getContainerToken();
+
+      ContainerManager proxy = null;
+      try {
+        commandTimer.schedule(timerTask, nmTimeOut);
+
+        proxy = getCMProxy(containerID, containerManagerBindAddr,
+            containerToken);
+
+        // Interrupted during getProxy, but that didn't throw exception
+        if (Thread.interrupted()) {
+          // The timer canceled the command in the mean while.
+          String message = "Container launch failed for " + containerID
+              + " : Start-container for " + event.getContainerID()
+              + " got interrupted. Returning.";
+          this.state = ContainerState.FAILED;
+          sendContainerLaunchFailedMsg(taskAttemptID, message);
+          return;
+        }
+        // Construct the actual Container
+        ContainerLaunchContext containerLaunchContext =
+          event.getContainer();
+
+        // Now launch the actual container
+        StartContainerRequest startRequest = Records
+          .newRecord(StartContainerRequest.class);
+        startRequest.setContainerLaunchContext(containerLaunchContext);
+        StartContainerResponse response = proxy.startContainer(startRequest);
+
+        // container started properly. Stop the timer
+        timerTask.cancel();
+        if (Thread.interrupted()) {
+          // The timer canceled the command in the mean while, but
+          // startContainer didn't throw exception
+          String message = "Container launch failed for " + containerID
+              + " : Start-container for " + event.getContainerID()
+              + " got interrupted. Returning.";
+          this.state = ContainerState.FAILED;
+          sendContainerLaunchFailedMsg(taskAttemptID, message);
+          return;
+        }
+
+        ByteBuffer portInfo = response
+          .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
+        int port = -1;
+        if(portInfo != null) {
+          port = ShuffleHandler.deserializeMetaData(portInfo);
+        }
+        LOG.info("Shuffle port returned by ContainerManager for "
+            + taskAttemptID + " : " + port);
+
+        if(port < 0) {
+          this.state = ContainerState.FAILED;
+          throw new IllegalStateException("Invalid shuffle port number "
+              + port + " returned for " + taskAttemptID);
+        }
+
+        // after launching, send launched event to task attempt to move
+        // it from ASSIGNED to RUNNING state
+        context.getEventHandler().handle(
+            new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
+        this.state = ContainerState.RUNNING;
+      } catch (Throwable t) {
+        if (Thread.interrupted()) {
+          // The timer canceled the command in the mean while.
+          LOG.info("Start-container for " + event.getContainerID()
+              + " got interrupted.");
+        }
+        String message = "Container launch failed for " + containerID + " : "
+            + StringUtils.stringifyException(t);
+        this.state = ContainerState.FAILED;
+        sendContainerLaunchFailedMsg(taskAttemptID, message);
+      } finally {
+        timerTask.cancel();
+        if (proxy != null) {
+          ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+        }
+      }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public synchronized void kill(ContainerLauncherEvent event) {
+      if(this.state == ContainerState.PREP) {
+        this.state = ContainerState.KILLED_BEFORE_LAUNCH;
+      } else {
+        CommandTimerTask timerTask = new CommandTimerTask(Thread
+            .currentThread(), event);
+
+        final String containerManagerBindAddr = event.getContainerMgrAddress();
+        ContainerId containerID = event.getContainerID();
+        ContainerToken containerToken = event.getContainerToken();
+        TaskAttemptId taskAttemptID = event.getTaskAttemptID();
+        LOG.info("KILLING " + taskAttemptID);
+        commandTimer.schedule(timerTask, nmTimeOut);
+
+        ContainerManager proxy = null;
+        try {
+          proxy = getCMProxy(containerID, containerManagerBindAddr,
+              containerToken);
+
+          if (Thread.interrupted()) {
+            // The timer canceled the command in the mean while. No need to
+            // return, send cleaned up event anyways.
+            LOG.info("Stop-container for " + event.getContainerID()
+                + " got interrupted.");
+          } else {
+            // kill the remote container if already launched
+            StopContainerRequest stopRequest = Records
+              .newRecord(StopContainerRequest.class);
+            stopRequest.setContainerId(event.getContainerID());
+            proxy.stopContainer(stopRequest);
+          }
+        } catch (Throwable t) {
+
+          if (Thread.interrupted()) {
+            // The timer canceled the command in the mean while, clear the
+            // interrupt flag
+            LOG.info("Stop-container for " + event.getContainerID()
+                + " got interrupted.");
+          }
+
+          // ignore the cleanup failure
+          String message = "cleanup failed for container "
+            + event.getContainerID() + " : "
+            + StringUtils.stringifyException(t);
+          context.getEventHandler().handle(
+            new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID, message));
+          LOG.warn(message);
+        } finally {
+          timerTask.cancel();
+          if (Thread.interrupted()) {
+            LOG.info("Stop-container for " + event.getContainerID()
+                + " got interrupted.");
+            // ignore the cleanup failure
+            context.getEventHandler().handle(
+              new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
+                "cleanup failed for container " + event.getContainerID()));
+          }
+          if (proxy != null) {
+            ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
+          }
+        }
+        this.state = ContainerState.DONE;
+      }
+      // after killing, send killed event to task attempt
+      context.getEventHandler().handle(
+          new TaskAttemptEvent(event.getTaskAttemptID(),
+              TaskAttemptEventType.TA_CONTAINER_CLEANED));
+    }
+  }
   // To track numNodes.
   Set<String> allNodes = new HashSet<String>();
 
@@ -102,11 +302,16 @@ public class ContainerLauncherImpl exten
     this.limitOnPoolSize = conf.getInt(
         MRJobConfig.MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT,
         MRJobConfig.DEFAULT_MR_AM_CONTAINERLAUNCHER_THREAD_COUNT_LIMIT);
+    LOG.info("Upper limit on the thread pool size is " + this.limitOnPoolSize);
     this.nmTimeOut = conf.getInt(ContainerLauncher.MR_AM_NM_COMMAND_TIMEOUT,
         ContainerLauncher.DEFAULT_NM_COMMAND_TIMEOUT);
-    this.rpc = YarnRPC.create(conf);
+    this.rpc = createYarnRPC(conf);
     super.init(conf);
   }
+  
+  protected YarnRPC createYarnRPC(Configuration conf) {
+    return YarnRPC.create(conf);
+  }
 
   public void start() {
 
@@ -118,7 +323,7 @@ public class ContainerLauncherImpl exten
         Integer.MAX_VALUE, 1, TimeUnit.HOURS,
         new LinkedBlockingQueue<Runnable>(),
         tf);
-    eventHandlingThread = new Thread(new Runnable() {
+    eventHandlingThread = new Thread() {
       @Override
       public void run() {
         ContainerLauncherEvent event = null;
@@ -141,26 +346,27 @@ public class ContainerLauncherImpl exten
             int numNodes = allNodes.size();
             int idealPoolSize = Math.min(limitOnPoolSize, numNodes);
 
-            if (poolSize <= idealPoolSize) {
+            if (poolSize < idealPoolSize) {
               // Bump up the pool size to idealPoolSize+INITIAL_POOL_SIZE, the
               // later is just a buffer so we are not always increasing the
               // pool-size
-              int newPoolSize = idealPoolSize + INITIAL_POOL_SIZE;
-              LOG.info("Setting ContainerLauncher pool size to "
-                  + newPoolSize);
+              int newPoolSize = Math.min(limitOnPoolSize, idealPoolSize
+                  + INITIAL_POOL_SIZE);
+              LOG.info("Setting ContainerLauncher pool size to " + newPoolSize
+                  + " as number-of-nodes to talk to is " + numNodes);
               launcherPool.setCorePoolSize(newPoolSize);
             }
           }
 
           // the events from the queue are handled in parallel
           // using a thread pool
-          launcherPool.execute(new EventProcessor(event));
+          launcherPool.execute(createEventProcessor(event));
 
           // TODO: Group launching of multiple containers to a single
           // NodeManager into a single connection
         }
       }
-    });
+    };
     eventHandlingThread.setName("ContainerLauncher Event Handler");
     eventHandlingThread.start();
     super.start();
@@ -172,14 +378,16 @@ public class ContainerLauncherImpl exten
     super.stop();
   }
 
+  protected EventProcessor createEventProcessor(ContainerLauncherEvent event) {
+    return new EventProcessor(event);
+  }
+
   protected ContainerManager getCMProxy(ContainerId containerID,
       final String containerManagerBindAddr, ContainerToken containerToken)
       throws IOException {
 
     UserGroupInformation user = UserGroupInformation.getCurrentUser();
 
-    this.allNodes.add(containerManagerBindAddr);
-
     if (UserGroupInformation.isSecurityEnabled()) {
       Token<ContainerTokenIdentifier> token = new Token<ContainerTokenIdentifier>(
           containerToken.getIdentifier().array(), containerToken
@@ -244,182 +452,35 @@ public class ContainerLauncherImpl exten
   /**
    * Setup and start the container on remote nodemanager.
    */
-  private class EventProcessor implements Runnable {
+  class EventProcessor implements Runnable {
     private ContainerLauncherEvent event;
 
     EventProcessor(ContainerLauncherEvent event) {
       this.event = event;
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     public void run() {
       LOG.info("Processing the event " + event.toString());
 
       // Load ContainerManager tokens before creating a connection.
       // TODO: Do it only once per NodeManager.
-      final String containerManagerBindAddr = event.getContainerMgrAddress();
       ContainerId containerID = event.getContainerID();
-      ContainerToken containerToken = event.getContainerToken();
-      TaskAttemptId taskAttemptID = event.getTaskAttemptID();
-
-      ContainerManager proxy = null;
-
-      CommandTimerTask timerTask = new CommandTimerTask(Thread
-          .currentThread(), event);
 
+      Container c = getContainer(containerID);
       switch(event.getType()) {
 
       case CONTAINER_REMOTE_LAUNCH:
         ContainerRemoteLaunchEvent launchEvent
             = (ContainerRemoteLaunchEvent) event;
-
-        try {
-          commandTimer.schedule(timerTask, nmTimeOut);
-
-          proxy = getCMProxy(containerID, containerManagerBindAddr,
-              containerToken);
-
-          // Interruped during getProxy, but that didn't throw exception
-          if (Thread.interrupted()) {
-            // The timer cancelled the command in the mean while.
-            String message = "Container launch failed for " + containerID
-                + " : Start-container for " + event.getContainerID()
-                + " got interrupted. Returning.";
-            sendContainerLaunchFailedMsg(taskAttemptID, message);
-            return;
-          }
-
-          // Construct the actual Container
-          ContainerLaunchContext containerLaunchContext =
-              launchEvent.getContainer();
-
-          // Now launch the actual container
-          StartContainerRequest startRequest = Records
-              .newRecord(StartContainerRequest.class);
-          startRequest.setContainerLaunchContext(containerLaunchContext);
-          StartContainerResponse response = proxy.startContainer(startRequest);
-
-          // container started properly. Stop the timer
-          timerTask.cancel();
-          if (Thread.interrupted()) {
-            // The timer cancelled the command in the mean while, but
-            // startContainer didn't throw exception
-            String message = "Container launch failed for " + containerID
-                + " : Start-container for " + event.getContainerID()
-                + " got interrupted. Returning.";
-            sendContainerLaunchFailedMsg(taskAttemptID, message);
-            return;
-          }
-
-          ByteBuffer portInfo = response
-              .getServiceResponse(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID);
-          int port = -1;
-          if(portInfo != null) {
-            port = ShuffleHandler.deserializeMetaData(portInfo);
-          }
-          LOG.info("Shuffle port returned by ContainerManager for "
-              + taskAttemptID + " : " + port);
-          
-          if(port < 0) {
-            throw new IllegalStateException("Invalid shuffle port number "
-                + port + " returned for " + taskAttemptID);
-          }
-
-          // after launching, send launched event to task attempt to move
-          // it from ASSIGNED to RUNNING state
-          context.getEventHandler().handle(
-              new TaskAttemptContainerLaunchedEvent(taskAttemptID, port));
-        } catch (Throwable t) {
-          if (Thread.interrupted()) {
-            // The timer cancelled the command in the mean while.
-            LOG.info("Start-container for " + event.getContainerID()
-                + " got interrupted.");
-          }
-          String message = "Container launch failed for " + containerID
-              + " : " + StringUtils.stringifyException(t);
-          sendContainerLaunchFailedMsg(taskAttemptID, message);
-        } finally {
-          timerTask.cancel();
-          if (proxy != null) {
-            ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
-          }
-        }
-
+        c.launch(launchEvent);
         break;
 
       case CONTAINER_REMOTE_CLEANUP:
-        // We will have to remove the launch (meant "cleanup"? FIXME) event if it is still in eventQueue
-        // and not yet processed
-        if (eventQueue.contains(event)) {
-          eventQueue.remove(event); // TODO: Any synchro needed?
-          //deallocate the container
-          context.getEventHandler().handle(
-              new ContainerAllocatorEvent(taskAttemptID,
-                  ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
-        } else {
-
-          try {
-            commandTimer.schedule(timerTask, nmTimeOut);
-
-            proxy = getCMProxy(containerID, containerManagerBindAddr,
-                containerToken);
-
-            if (Thread.interrupted()) {
-              // The timer cancelled the command in the mean while. No need to
-              // return, send cleanedup event anyways.
-              LOG.info("Stop-container for " + event.getContainerID()
-                  + " got interrupted.");
-            } else {
-
-              // TODO:check whether container is launched
-
-              // kill the remote container if already launched
-              StopContainerRequest stopRequest = Records
-                  .newRecord(StopContainerRequest.class);
-              stopRequest.setContainerId(event.getContainerID());
-              proxy.stopContainer(stopRequest);
-            }
-          } catch (Throwable t) {
-
-            if (Thread.interrupted()) {
-              // The timer cancelled the command in the mean while, clear the
-              // interrupt flag
-              LOG.info("Stop-container for " + event.getContainerID()
-                  + " got interrupted.");
-            }
-
-            // ignore the cleanup failure
-            String message = "cleanup failed for container "
-                + event.getContainerID() + " : "
-                + StringUtils.stringifyException(t);
-            context.getEventHandler()
-                .handle(
-                    new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
-                        message));
-            LOG.warn(message);
-          } finally {
-            timerTask.cancel();
-            if (Thread.interrupted()) {
-              LOG.info("Stop-container for " + event.getContainerID()
-                  + " got interrupted.");
-              // ignore the cleanup failure
-              context.getEventHandler()
-                  .handle(new TaskAttemptDiagnosticsUpdateEvent(taskAttemptID,
-                    "cleanup failed for container " + event.getContainerID()));
-            }
-            if (proxy != null) {
-              ContainerLauncherImpl.this.rpc.stopProxy(proxy, getConfig());
-            }
-          }
-
-          // after killing, send killed event to taskattempt
-          context.getEventHandler().handle(
-              new TaskAttemptEvent(event.getTaskAttemptID(),
-                  TaskAttemptEventType.TA_CONTAINER_CLEANED));
-        }
+        c.kill(event);
         break;
       }
+      removeContainerIfDone(containerID);
     }
   }
 
@@ -438,6 +499,7 @@ public class ContainerLauncherImpl exten
   public void handle(ContainerLauncherEvent event) {
     try {
       eventQueue.put(event);
+      this.allNodes.add(event.getContainerMgrAddress());
     } catch (InterruptedException e) {
       throw new YarnException(e);
     }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerRemoteLaunchEvent.java Mon Jan 16 04:24:24 2012
@@ -46,6 +46,11 @@ public class ContainerRemoteLaunchEvent 
   public Task getRemoteTask() {
     return this.task;
   }
+  
+  @Override
+  public int hashCode() {
+    return super.hashCode();
+  }
 
   @Override
   public boolean equals(Object obj) {

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java Mon Jan 16 04:24:24 2012
@@ -92,7 +92,6 @@ import org.apache.hadoop.yarn.util.Conve
 
 //TODO:
 //task cleanup for all non completed tasks
-
 public class RecoveryService extends CompositeService implements Recovery {
 
   private static final Log LOG = LogFactory.getLog(RecoveryService.class);
@@ -411,8 +410,7 @@ public class RecoveryService extends Com
       if (cntrs == null) {
         taskAttemptStatus.counters = null;
       } else {
-        taskAttemptStatus.counters = TypeConverter.toYarn(attemptInfo
-            .getCounters());
+        taskAttemptStatus.counters = cntrs;
       }
       actualHandler.handle(new TaskAttemptStatusUpdateEvent(
           taskAttemptStatus.id, taskAttemptStatus));

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/AMWebServices.java Mon Jan 16 04:24:24 2012
@@ -33,6 +33,7 @@ import javax.ws.rs.core.Response.Status;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
@@ -42,6 +43,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.AppInfo;
+import org.apache.hadoop.mapreduce.v2.app.webapp.dao.AMAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.app.webapp.dao.AMAttemptsInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobCounterInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobInfo;
@@ -76,15 +79,91 @@ public class AMWebServices {
   }
 
   Boolean hasAccess(Job job, HttpServletRequest request) {
-    UserGroupInformation callerUgi = UserGroupInformation
-        .createRemoteUser(request.getRemoteUser());
-    if (!job.checkAccess(callerUgi, JobACL.VIEW_JOB)) {
+    String remoteUser = request.getRemoteUser();
+    UserGroupInformation callerUGI = null;
+    if (remoteUser != null) {
+      callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    if (callerUGI != null && !job.checkAccess(callerUGI, JobACL.VIEW_JOB)) {
       return false;
     }
     return true;
   }
 
   /**
+   * convert a job id string to an actual job and handle all the error checking.
+   */
+ public static Job getJobFromJobIdString(String jid, AppContext appCtx) throws NotFoundException {
+    JobId jobId;
+    Job job;
+    try {
+      jobId = MRApps.toJobID(jid);
+    } catch (YarnException e) {
+      throw new NotFoundException(e.getMessage());
+    }
+    if (jobId == null) {
+      throw new NotFoundException("job, " + jid + ", is not found");
+    }
+    job = appCtx.getJob(jobId);
+    if (job == null) {
+      throw new NotFoundException("job, " + jid + ", is not found");
+    }
+    return job;
+  }
+
+  /**
+   * convert a task id string to an actual task and handle all the error
+   * checking.
+   */
+  public static Task getTaskFromTaskIdString(String tid, Job job) throws NotFoundException {
+    TaskId taskID;
+    Task task;
+    try {
+      taskID = MRApps.toTaskID(tid);
+    } catch (YarnException e) {
+      throw new NotFoundException(e.getMessage());
+    } catch (NumberFormatException ne) {
+      throw new NotFoundException(ne.getMessage());
+    }
+    if (taskID == null) {
+      throw new NotFoundException("taskid " + tid + " not found or invalid");
+    }
+    task = job.getTask(taskID);
+    if (task == null) {
+      throw new NotFoundException("task not found with id " + tid);
+    }
+    return task;
+  }
+
+  /**
+   * convert a task attempt id string to an actual task attempt and handle all
+   * the error checking.
+   */
+  public static TaskAttempt getTaskAttemptFromTaskAttemptString(String attId, Task task)
+      throws NotFoundException {
+    TaskAttemptId attemptId;
+    TaskAttempt ta;
+    try {
+      attemptId = MRApps.toTaskAttemptID(attId);
+    } catch (YarnException e) {
+      throw new NotFoundException(e.getMessage());
+    } catch (NumberFormatException ne) {
+      throw new NotFoundException(ne.getMessage());
+    }
+    if (attemptId == null) {
+      throw new NotFoundException("task attempt id " + attId
+          + " not found or invalid");
+    }
+    ta = task.getAttempt(attemptId);
+    if (ta == null) {
+      throw new NotFoundException("Error getting info on task attempt id "
+          + attId);
+    }
+    return ta;
+  }
+
+
+  /**
    * check for job access.
    *
    * @param job
@@ -130,59 +209,33 @@ public class AMWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public JobInfo getJob(@Context HttpServletRequest hsr,
       @PathParam("jobid") String jid) {
-    JobId jobId = MRApps.toJobID(jid);
-    if (jobId == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
-    Job job = appCtx.getJob(jobId);
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
+    Job job = getJobFromJobIdString(jid, appCtx);
     return new JobInfo(job, hasAccess(job, hsr));
-
   }
 
   @GET
-  @Path("/jobs/{jobid}/counters")
+  @Path("/jobs/{jobid}/jobattempts")
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
-  public JobCounterInfo getJobCounters(@Context HttpServletRequest hsr,
-      @PathParam("jobid") String jid) {
-    JobId jobId = MRApps.toJobID(jid);
-    if (jobId == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
-    Job job = appCtx.getJob(jobId);
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
+  public AMAttemptsInfo getJobAttempts(@PathParam("jobid") String jid) {
+
+    Job job = getJobFromJobIdString(jid, appCtx);
+    AMAttemptsInfo amAttempts = new AMAttemptsInfo();
+    for (AMInfo amInfo : job.getAMInfos()) {
+      AMAttemptInfo attempt = new AMAttemptInfo(amInfo, MRApps.toString(
+            job.getID()), job.getUserName());
+      amAttempts.add(attempt);
     }
-    checkAccess(job, hsr);
-    return new JobCounterInfo(this.appCtx, job);
+    return amAttempts;
   }
 
   @GET
-  @Path("/jobs/{jobid}/tasks/{taskid}/counters")
+  @Path("/jobs/{jobid}/counters")
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
-  public JobTaskCounterInfo getSingleTaskCounters(
-      @Context HttpServletRequest hsr, @PathParam("jobid") String jid,
-      @PathParam("taskid") String tid) {
-    JobId jobId = MRApps.toJobID(jid);
-    if (jobId == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
-    Job job = this.appCtx.getJob(jobId);
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
+  public JobCounterInfo getJobCounters(@Context HttpServletRequest hsr,
+      @PathParam("jobid") String jid) {
+    Job job = getJobFromJobIdString(jid, appCtx);
     checkAccess(job, hsr);
-    TaskId taskID = MRApps.toTaskID(tid);
-    if (taskID == null) {
-      throw new NotFoundException("taskid " + tid + " not found or invalid");
-    }
-    Task task = job.getTask(taskID);
-    if (task == null) {
-      throw new NotFoundException("task not found with id " + tid);
-    }
-    return new JobTaskCounterInfo(task);
+    return new JobCounterInfo(this.appCtx, job);
   }
 
   @GET
@@ -190,20 +243,15 @@ public class AMWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public ConfInfo getJobConf(@Context HttpServletRequest hsr,
       @PathParam("jobid") String jid) {
-    JobId jobId = MRApps.toJobID(jid);
-    if (jobId == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
-    Job job = appCtx.getJob(jobId);
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
+
+    Job job = getJobFromJobIdString(jid, appCtx);
     checkAccess(job, hsr);
     ConfInfo info;
     try {
       info = new ConfInfo(job, this.conf);
     } catch (IOException e) {
-      throw new NotFoundException("unable to load configuration for job: " + jid);
+      throw new NotFoundException("unable to load configuration for job: "
+          + jid);
     }
     return info;
   }
@@ -213,10 +261,8 @@ public class AMWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public TasksInfo getJobTasks(@Context HttpServletRequest hsr,
       @PathParam("jobid") String jid, @QueryParam("type") String type) {
-    Job job = this.appCtx.getJob(MRApps.toJobID(jid));
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
+
+    Job job = getJobFromJobIdString(jid, appCtx);
     checkAccess(job, hsr);
     TasksInfo allTasks = new TasksInfo();
     for (Task task : job.getTasks().values()) {
@@ -225,7 +271,8 @@ public class AMWebServices {
         try {
           ttype = MRApps.taskType(type);
         } catch (YarnException e) {
-          throw new BadRequestException("tasktype must be either m or r");        }
+          throw new BadRequestException("tasktype must be either m or r");
+        }
       }
       if (ttype != null && task.getType() != ttype) {
         continue;
@@ -240,21 +287,24 @@ public class AMWebServices {
   @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
   public TaskInfo getJobTask(@Context HttpServletRequest hsr,
       @PathParam("jobid") String jid, @PathParam("taskid") String tid) {
-    Job job = this.appCtx.getJob(MRApps.toJobID(jid));
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
+
+    Job job = getJobFromJobIdString(jid, appCtx);
     checkAccess(job, hsr);
-    TaskId taskID = MRApps.toTaskID(tid);
-    if (taskID == null) {
-      throw new NotFoundException("taskid " + tid + " not found or invalid");
-    }
-    Task task = job.getTask(taskID);
-    if (task == null) {
-      throw new NotFoundException("task not found with id " + tid);
-    }
+    Task task = getTaskFromTaskIdString(tid, job);
     return new TaskInfo(task);
+  }
+
+  @GET
+  @Path("/jobs/{jobid}/tasks/{taskid}/counters")
+  @Produces({ MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML })
+  public JobTaskCounterInfo getSingleTaskCounters(
+      @Context HttpServletRequest hsr, @PathParam("jobid") String jid,
+      @PathParam("taskid") String tid) {
 
+    Job job = getJobFromJobIdString(jid, appCtx);
+    checkAccess(job, hsr);
+    Task task = getTaskFromTaskIdString(tid, job);
+    return new JobTaskCounterInfo(task);
   }
 
   @GET
@@ -263,19 +313,11 @@ public class AMWebServices {
   public TaskAttemptsInfo getJobTaskAttempts(@Context HttpServletRequest hsr,
       @PathParam("jobid") String jid, @PathParam("taskid") String tid) {
     TaskAttemptsInfo attempts = new TaskAttemptsInfo();
-    Job job = this.appCtx.getJob(MRApps.toJobID(jid));
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
+
+    Job job = getJobFromJobIdString(jid, appCtx);
     checkAccess(job, hsr);
-    TaskId taskID = MRApps.toTaskID(tid);
-    if (taskID == null) {
-      throw new NotFoundException("taskid " + tid + " not found or invalid");
-    }
-    Task task = job.getTask(taskID);
-    if (task == null) {
-      throw new NotFoundException("task not found with id " + tid);
-    }
+    Task task = getTaskFromTaskIdString(tid, job);
+
     for (TaskAttempt ta : task.getAttempts().values()) {
       if (ta != null) {
         if (task.getType() == TaskType.REDUCE) {
@@ -294,29 +336,11 @@ public class AMWebServices {
   public TaskAttemptInfo getJobTaskAttemptId(@Context HttpServletRequest hsr,
       @PathParam("jobid") String jid, @PathParam("taskid") String tid,
       @PathParam("attemptid") String attId) {
-    Job job = this.appCtx.getJob(MRApps.toJobID(jid));
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
+
+    Job job = getJobFromJobIdString(jid, appCtx);
     checkAccess(job, hsr);
-    TaskId taskID = MRApps.toTaskID(tid);
-    if (taskID == null) {
-      throw new NotFoundException("taskid " + tid + " not found or invalid");
-    }
-    Task task = job.getTask(taskID);
-    if (task == null) {
-      throw new NotFoundException("task not found with id " + tid);
-    }
-    TaskAttemptId attemptId = MRApps.toTaskAttemptID(attId);
-    if (attemptId == null) {
-      throw new NotFoundException("task attempt id " + attId
-          + " not found or invalid");
-    }
-    TaskAttempt ta = task.getAttempt(attemptId);
-    if (ta == null) {
-      throw new NotFoundException("Error getting info on task attempt id "
-          + attId);
-    }
+    Task task = getTaskFromTaskIdString(tid, job);
+    TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task);
     if (task.getType() == TaskType.REDUCE) {
       return new ReduceTaskAttemptInfo(ta, task.getType());
     } else {
@@ -330,33 +354,11 @@ public class AMWebServices {
   public JobTaskAttemptCounterInfo getJobTaskAttemptIdCounters(
       @Context HttpServletRequest hsr, @PathParam("jobid") String jid,
       @PathParam("taskid") String tid, @PathParam("attemptid") String attId) {
-    JobId jobId = MRApps.toJobID(jid);
-    if (jobId == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
-    Job job = this.appCtx.getJob(jobId);
-    if (job == null) {
-      throw new NotFoundException("job, " + jid + ", is not found");
-    }
+
+    Job job = getJobFromJobIdString(jid, appCtx);
     checkAccess(job, hsr);
-    TaskId taskID = MRApps.toTaskID(tid);
-    if (taskID == null) {
-      throw new NotFoundException("taskid " + tid + " not found or invalid");
-    }
-    Task task = job.getTask(taskID);
-    if (task == null) {
-      throw new NotFoundException("task not found with id " + tid);
-    }
-    TaskAttemptId attemptId = MRApps.toTaskAttemptID(attId);
-    if (attemptId == null) {
-      throw new NotFoundException("task attempt id " + attId
-          + " not found or invalid");
-    }
-    TaskAttempt ta = task.getAttempt(attemptId);
-    if (ta == null) {
-      throw new NotFoundException("Error getting info on task attempt id "
-          + attId);
-    }
+    Task task = getTaskFromTaskIdString(tid, job);
+    TaskAttempt ta = getTaskAttemptFromTaskAttemptString(attId, task);
     return new JobTaskAttemptCounterInfo(ta);
   }
 }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/CountersBlock.java Mon Jan 16 04:24:24 2012
@@ -18,25 +18,32 @@
 
 package org.apache.hadoop.mapreduce.v2.app.webapp;
 
-import com.google.inject.Inject;
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
+import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_ID;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_TABLE;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
+
 import java.util.Map;
 
-import org.apache.hadoop.mapreduce.v2.api.records.Counter;
-import org.apache.hadoop.mapreduce.v2.api.records.CounterGroup;
-import org.apache.hadoop.mapreduce.v2.api.records.Counters;
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
-import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
-import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.*;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TD;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.THEAD;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TR;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 
-import static org.apache.hadoop.mapreduce.v2.app.webapp.AMWebApp.*;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
+import com.google.inject.Inject;
 
 public class CountersBlock extends HtmlBlock {
   Job job;
@@ -62,8 +69,7 @@ public class CountersBlock extends HtmlB
       return;
     }
     
-    if(total == null || total.getAllCounterGroups() == null || 
-        total.getAllCounterGroups().size() <= 0) {
+    if(total == null || total.getGroupNames() == null) {
       String type = $(TASK_ID);
       if(type == null || type.isEmpty()) {
         type = $(JOB_ID, "the job");
@@ -93,9 +99,9 @@ public class CountersBlock extends HtmlB
             th(".group.ui-state-default", "Counter Group").
             th(".ui-state-default", "Counters")._()._().
         tbody();
-    for (CounterGroup g : total.getAllCounterGroups().values()) {
-      CounterGroup mg = map == null ? null : map.getCounterGroup(g.getName());
-      CounterGroup rg = reduce == null ? null : reduce.getCounterGroup(g.getName());
+    for (CounterGroup g : total) {
+      CounterGroup mg = map == null ? null : map.getGroup(g.getName());
+      CounterGroup rg = reduce == null ? null : reduce.getGroup(g.getName());
       ++numGroups;
       // This is mostly for demonstration :) Typically we'd introduced
       // a CounterGroup block to reduce the verbosity. OTOH, this
@@ -116,7 +122,7 @@ public class CountersBlock extends HtmlB
       TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>> group = groupHeadRow.
             th(map == null ? "Value" : "Total")._()._().
         tbody();
-      for (Counter counter : g.getAllCounters().values()) {
+      for (Counter counter : g) {
         // Ditto
         TR<TBODY<TABLE<TD<TR<TBODY<TABLE<DIV<Hamlet>>>>>>>> groupRow = group.
           tr();
@@ -130,8 +136,8 @@ public class CountersBlock extends HtmlB
             _();
           }
         if (map != null) {
-          Counter mc = mg == null ? null : mg.getCounter(counter.getName());
-          Counter rc = rg == null ? null : rg.getCounter(counter.getName());
+          Counter mc = mg == null ? null : mg.findCounter(counter.getName());
+          Counter rc = rg == null ? null : rg.findCounter(counter.getName());
           groupRow.
             td(mc == null ? "0" : String.valueOf(mc.getValue())).
             td(rc == null ? "0" : String.valueOf(rc.getValue()));
@@ -173,14 +179,14 @@ public class CountersBlock extends HtmlB
     }
     // Get all types of counters
     Map<TaskId, Task> tasks = job.getTasks();
-    total = job.getCounters();
-    map = JobImpl.newCounters();
-    reduce = JobImpl.newCounters();
+    total = job.getAllCounters();
+    map = new Counters();
+    reduce = new Counters();
     for (Task t : tasks.values()) {
       Counters counters = t.getCounters();
       switch (t.getType()) {
-        case MAP:     JobImpl.incrAllCounters(map, counters);     break;
-        case REDUCE:  JobImpl.incrAllCounters(reduce, counters);  break;
+        case MAP:     map.incrAllCounters(counters);     break;
+        case REDUCE:  reduce.incrAllCounters(counters);  break;
       }
     }
   }

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JAXBContextResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JAXBContextResolver.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JAXBContextResolver.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JAXBContextResolver.java Mon Jan 16 04:24:24 2012
@@ -30,6 +30,8 @@ import javax.ws.rs.ext.ContextResolver;
 import javax.ws.rs.ext.Provider;
 import javax.xml.bind.JAXBContext;
 
+import org.apache.hadoop.mapreduce.v2.app.webapp.dao.AMAttemptInfo;
+import org.apache.hadoop.mapreduce.v2.app.webapp.dao.AMAttemptsInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.AppInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.ConfEntryInfo;
@@ -47,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskCounterInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TasksInfo;
+import org.apache.hadoop.yarn.webapp.RemoteExceptionData;
 
 @Singleton
 @Provider
@@ -54,22 +57,22 @@ public class JAXBContextResolver impleme
 
   private JAXBContext context;
   private final Set<Class> types;
-    
+
   // you have to specify all the dao classes here
-  private final Class[] cTypes = {AppInfo.class, CounterInfo.class,
-      JobTaskAttemptCounterInfo.class, JobTaskCounterInfo.class,
-      TaskCounterGroupInfo.class, ConfInfo.class, JobCounterInfo.class,
-      TaskCounterInfo.class, CounterGroupInfo.class, JobInfo.class, 
-      JobsInfo.class, ReduceTaskAttemptInfo.class, TaskAttemptInfo.class,
-      TaskInfo.class, TasksInfo.class, TaskAttemptsInfo.class,
-      ConfEntryInfo.class};
-    
+  private final Class[] cTypes = {AMAttemptInfo.class, AMAttemptsInfo.class,
+    AppInfo.class, CounterInfo.class, JobTaskAttemptCounterInfo.class,
+    JobTaskCounterInfo.class, TaskCounterGroupInfo.class, ConfInfo.class,
+    JobCounterInfo.class, TaskCounterInfo.class, CounterGroupInfo.class,
+    JobInfo.class, JobsInfo.class, ReduceTaskAttemptInfo.class,
+    TaskAttemptInfo.class, TaskInfo.class, TasksInfo.class,
+    TaskAttemptsInfo.class, ConfEntryInfo.class, RemoteExceptionData.class};
+
   public JAXBContextResolver() throws Exception {
     this.types = new HashSet<Class>(Arrays.asList(cTypes));
     this.context = new JSONJAXBContext(JSONConfiguration.natural().
         rootUnwrapping(false).build(), cTypes);
   }
-    
+
   @Override
   public JAXBContext getContext(Class<?> objectType) {
     return (types.contains(objectType)) ? context : null;

Modified: hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java?rev=1231834&r1=1231833&r2=1231834&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java (original)
+++ hadoop/common/branches/branch-0.23-PB/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/webapp/JobBlock.java Mon Jan 16 04:24:24 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapreduce.v2.a
 
 import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.JOB_ID;
 import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.util.StringHelper.ujoin;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._EVEN;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._INFO_WRAP;
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._ODD;
@@ -28,14 +29,22 @@ import static org.apache.hadoop.yarn.web
 import static org.apache.hadoop.yarn.webapp.view.JQueryUI._TH;
 
 import java.util.Date;
+import java.util.List;
 
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.webapp.dao.AMAttemptInfo;
 import org.apache.hadoop.mapreduce.v2.app.webapp.dao.JobInfo;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.mapreduce.v2.util.MRApps.TaskAttemptStateUI;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.DIV;
+import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
 import org.apache.hadoop.yarn.webapp.view.HtmlBlock;
 import org.apache.hadoop.yarn.webapp.view.InfoBlock;
 
@@ -62,6 +71,11 @@ public class JobBlock extends HtmlBlock 
         p()._("Sorry, ", jid, " not found.")._();
       return;
     }
+
+    List<AMInfo> amInfos = job.getAMInfos();
+    String amString =
+        amInfos.size() == 1 ? "ApplicationMaster" : "ApplicationMasters"; 
+
     JobInfo jinfo = new JobInfo(job, true);
     info("Job Overview").
         _("Job Name:", jinfo.getName()).
@@ -69,10 +83,40 @@ public class JobBlock extends HtmlBlock 
         _("Uberized:", jinfo.isUberized()).
         _("Started:", new Date(jinfo.getStartTime())).
         _("Elapsed:", StringUtils.formatTime(jinfo.getElapsedTime()));
-    html.
+    DIV<Hamlet> div = html.
       _(InfoBlock.class).
-      div(_INFO_WRAP).
+      div(_INFO_WRAP);
+
+    // MRAppMasters Table
+    TABLE<DIV<Hamlet>> table = div.table("#job");
+    table.
+      tr().
+      th(amString).
+      _().
+      tr().
+      th(_TH, "Attempt Number").
+      th(_TH, "Start Time").
+      th(_TH, "Node").
+      th(_TH, "Logs").
+      _();
+    for (AMInfo amInfo : amInfos) {
+      AMAttemptInfo attempt = new AMAttemptInfo(amInfo,
+          jinfo.getId(), jinfo.getUserName());
+
+      table.tr().
+        td(String.valueOf(attempt.getAttemptId())).
+        td(new Date(attempt.getStartTime()).toString()).
+        td().a(".nodelink", url("http://", attempt.getNodeHttpAddress()), 
+            attempt.getNodeHttpAddress())._().
+        td().a(".logslink", url(attempt.getLogsLink()), 
+            "logs")._().
+        _();
+    }
+
+    table._();
+    div._();
 
+    html.div(_INFO_WRAP).        
       // Tasks table
         table("#job").
           tr().



Mime
View raw message