hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1082677 [5/38] - in /hadoop/mapreduce/branches/MR-279: ./ assembly/ ivy/ mr-client/ mr-client/hadoop-mapreduce-client-app/ mr-client/hadoop-mapreduce-client-app/src/ mr-client/hadoop-mapreduce-client-app/src/main/ mr-client/hadoop-mapreduc...
Date Thu, 17 Mar 2011 20:21:54 GMT
Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,1169 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.mapred.MapReduceChildJVM;
+import org.apache.hadoop.mapred.Task;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
+import org.apache.hadoop.mapreduce.security.TokenCache;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
+import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.conf.YARNApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.AvroUtil;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.ContainerBuilderHelper;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerLaunchContext;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.yarn.LocalResource;
+import org.apache.hadoop.yarn.LocalResourceType;
+import org.apache.hadoop.yarn.LocalResourceVisibility;
+import org.apache.hadoop.yarn.Resource;
+import org.apache.hadoop.yarn.URL;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.Phase;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * Implementation of TaskAttempt interface.
+ */
+@SuppressWarnings("all")
+public abstract class TaskAttemptImpl implements
+    org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt,
+      EventHandler<TaskAttemptEvent> {
+
+  private static final Log LOG = LogFactory.getLog(TaskAttemptImpl.class);
+
+  protected final Configuration conf;
+  protected final Path jobFile;
+  protected final int partition;
+  protected final EventHandler eventHandler;
+  private final TaskAttemptID attemptId;
+  private final org.apache.hadoop.mapred.JobID oldJobId;
+  private final TaskAttemptListener taskAttemptListener;
+  private final OutputCommitter committer;
+  private final Resource resourceCapability;
+  private final String[] dataLocalHosts;
+  private final List<CharSequence> diagnostics = new ArrayList<CharSequence>();
+  private final Lock readLock;
+  private final Lock writeLock;
+  private Collection<Token<? extends TokenIdentifier>> fsTokens;
+  private Token<JobTokenIdentifier> jobToken;
+
+  private long launchTime;
+  private long finishTime;
+
+  private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
+    new CleanupContainerTransition();
+  private static final StateMachineFactory
+        <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+        stateMachineFactory
+    = new StateMachineFactory
+             <TaskAttemptImpl, TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+           (TaskAttemptState.NEW)
+
+     // Transitions from the NEW state.
+     .addTransition(TaskAttemptState.NEW, TaskAttemptState.UNASSIGNED,
+         TaskAttemptEventType.TA_SCHEDULE, new RequestContainerTransition())
+     .addTransition(TaskAttemptState.NEW, TaskAttemptState.KILLED,
+         TaskAttemptEventType.TA_KILL, new KilledTransition())
+
+     // Transitions from the UNASSIGNED state.
+     .addTransition(TaskAttemptState.UNASSIGNED,
+         TaskAttemptState.ASSIGNED, TaskAttemptEventType.TA_ASSIGNED,
+         new ContainerAssignedTransition())
+     .addTransition(TaskAttemptState.UNASSIGNED, TaskAttemptState.KILLED,
+         TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
+             TaskAttemptState.KILLED, true))
+
+     // Transitions from the ASSIGNED state.
+     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.RUNNING,
+         TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
+         new LaunchedContainerTransition())
+     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
+         TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
+         new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
+     .addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.KILLED,
+         TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
+             TaskAttemptState.KILLED, false))
+
+     // Transitions from RUNNING state.
+     .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+         TaskAttemptEventType.TA_UPDATE, new StatusUpdater())
+     .addTransition(TaskAttemptState.RUNNING, TaskAttemptState.RUNNING,
+         TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+         new DiagnosticInformationUpdater())
+     // If no commit is required, task directly goes to success
+     .addTransition(TaskAttemptState.RUNNING,
+         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+     // If commit is required, task goes through commit pending state.
+     .addTransition(TaskAttemptState.RUNNING,
+         TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
+     // Failure handling while RUNNING
+     .addTransition(TaskAttemptState.RUNNING,
+         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+      //for handling container exit without sending the done or fail msg
+     .addTransition(TaskAttemptState.RUNNING,
+         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+         CLEANUP_CONTAINER_TRANSITION)
+     // Timeout handling while RUNNING
+     .addTransition(TaskAttemptState.RUNNING,
+         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
+     // Kill handling
+     .addTransition(TaskAttemptState.RUNNING,
+         TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+         CLEANUP_CONTAINER_TRANSITION)
+
+     // Transitions from COMMIT_PENDING state
+     .addTransition(TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptState.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
+         new StatusUpdater())
+     .addTransition(TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+         new DiagnosticInformationUpdater())
+     .addTransition(TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptState.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+         CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+         CLEANUP_CONTAINER_TRANSITION)
+     .addTransition(TaskAttemptState.COMMIT_PENDING,
+         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
+
+     // Transitions from SUCCESS_CONTAINER_CLEANUP state
+     // kill and cleanup the container
+     .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptState.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
+         new SucceededTransition())
+      // Ignore-able events
+     .addTransition(TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+         TaskAttemptState.SUCCESS_CONTAINER_CLEANUP,
+         EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_TIMED_OUT,
+             TaskAttemptEventType.TA_CONTAINER_COMPLETED))
+
+     // Transitions from FAIL_CONTAINER_CLEANUP state.
+     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptState.FAIL_TASK_CLEANUP,
+         TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
+      // Ignore-able events
+     .addTransition(TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         TaskAttemptState.FAIL_CONTAINER_CLEANUP,
+         EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+             TaskAttemptEventType.TA_UPDATE,
+             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+             TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_DONE,
+             TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_TIMED_OUT))
+
+      // Transitions from KILL_CONTAINER_CLEANUP
+     .addTransition(TaskAttemptState.KILL_CONTAINER_CLEANUP,
+         TaskAttemptState.KILL_TASK_CLEANUP,
+         TaskAttemptEventType.TA_CONTAINER_CLEANED, new TaskCleanupTransition())
+     // Ignore-able events
+     .addTransition(
+         TaskAttemptState.KILL_CONTAINER_CLEANUP,
+         TaskAttemptState.KILL_CONTAINER_CLEANUP,
+         EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+             TaskAttemptEventType.TA_UPDATE,
+             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+             TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_DONE,
+             TaskAttemptEventType.TA_FAILMSG,
+             TaskAttemptEventType.TA_TIMED_OUT))
+
+     // Transitions from FAIL_TASK_CLEANUP
+     // run the task cleanup
+     .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
+         TaskAttemptState.FAILED, TaskAttemptEventType.TA_CLEANUP_DONE,
+         new FailedTransition())
+      // Ignore-able events
+     .addTransition(TaskAttemptState.FAIL_TASK_CLEANUP,
+         TaskAttemptState.FAIL_TASK_CLEANUP,
+         EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+             TaskAttemptEventType.TA_UPDATE,
+             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+             TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_DONE,
+             TaskAttemptEventType.TA_FAILMSG))
+
+     // Transitions from KILL_TASK_CLEANUP
+     .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
+         TaskAttemptState.KILLED, TaskAttemptEventType.TA_CLEANUP_DONE,
+         new KilledTransition())
+     // Ignore-able events
+     .addTransition(TaskAttemptState.KILL_TASK_CLEANUP,
+         TaskAttemptState.KILL_TASK_CLEANUP,
+         EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+             TaskAttemptEventType.TA_UPDATE,
+             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+             TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_DONE,
+             TaskAttemptEventType.TA_FAILMSG))
+
+      // Transitions from SUCCEEDED
+      .addTransition(TaskAttemptState.SUCCEEDED, //only possible for map attempts
+         TaskAttemptState.FAILED,
+         TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE,
+         new TooManyFetchFailureTransition())
+     // Ignore-able events for SUCCEEDED state
+     .addTransition(TaskAttemptState.SUCCEEDED,
+         TaskAttemptState.SUCCEEDED,
+         EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_CONTAINER_COMPLETED))
+
+     // Ignore-able events for FAILED state
+     .addTransition(TaskAttemptState.FAILED, TaskAttemptState.FAILED,
+         EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+             TaskAttemptEventType.TA_UPDATE,
+             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+             TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_DONE,
+             TaskAttemptEventType.TA_FAILMSG))
+
+     // Ignore-able events for KILLED state
+     .addTransition(TaskAttemptState.KILLED, TaskAttemptState.KILLED,
+         EnumSet.of(TaskAttemptEventType.TA_KILL,
+             TaskAttemptEventType.TA_ASSIGNED,
+             TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+             TaskAttemptEventType.TA_UPDATE,
+             TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+             TaskAttemptEventType.TA_COMMIT_PENDING,
+             TaskAttemptEventType.TA_DONE,
+             TaskAttemptEventType.TA_FAILMSG))
+
+     // create the topology tables
+     .installTopology();
+
+  private final StateMachine
+         <TaskAttemptState, TaskAttemptEventType, TaskAttemptEvent>
+    stateMachine;
+
+  private ContainerID containerID;
+  private String containerMgrAddress;
+  private WrappedJvmID jvmID;
+  private ContainerToken containerToken;
+  
+  //this takes good amount of memory ~ 30KB. Instantiate it lazily
+  //and make it null once task is launched.
+  private org.apache.hadoop.mapred.Task remoteTask;
+  
+  //this is the last status reported by the REMOTE running attempt
+  private TaskAttemptStatus reportedStatus;
+
+  public TaskAttemptImpl(TaskID taskId, int i, EventHandler eventHandler,
+      TaskAttemptListener taskAttemptListener, Path jobFile, int partition,
+      Configuration conf, String[] dataLocalHosts, OutputCommitter committer,
+      Token<JobTokenIdentifier> jobToken,
+      Collection<Token<? extends TokenIdentifier>> fsTokens) {
+    oldJobId = TypeConverter.fromYarn(taskId.jobID);
+    this.conf = conf;
+    attemptId = new TaskAttemptID();
+    attemptId.taskID = taskId;
+    attemptId.id = i;
+    this.taskAttemptListener = taskAttemptListener;
+
+    // Initialize reportedStatus
+    reportedStatus = new TaskAttemptStatus();
+    initTaskAttemptStatus(reportedStatus);
+
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    readLock = readWriteLock.readLock();
+    writeLock = readWriteLock.writeLock();
+
+    this.fsTokens = fsTokens;
+    this.jobToken = jobToken;
+    this.eventHandler = eventHandler;
+    this.committer = committer;
+    this.jobFile = jobFile;
+    this.partition = partition;
+
+    //TODO:create the resource reqt for this Task attempt
+    this.resourceCapability = new Resource();
+    this.resourceCapability.memory =  getMemoryRequired(conf, taskId.taskType);
+    this.dataLocalHosts = dataLocalHosts;
+
+    // This "this leak" is okay because the retained pointer is in an
+    //  instance variable.
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  private int getMemoryRequired(Configuration conf, TaskType taskType) {
+    int memory = 1024;
+    if (taskType == TaskType.MAP)  {
+      memory = conf.getInt(MRJobConfig.MAP_MEMORY_MB, 1024);
+    } else if (taskType == TaskType.REDUCE) {
+      memory = conf.getInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
+    }
+    
+    return 1024;
+  }
+
+  private static LocalResource getLocalResource(FileContext fc, Path file, 
+      LocalResourceType type, LocalResourceVisibility visibility) 
+  throws IOException {
+    FileStatus fstat = fc.getFileStatus(file);
+    LocalResource resource = new LocalResource();
+    resource.resource = AvroUtil.getYarnUrlFromPath(fstat.getPath());
+    resource.type = type;
+    resource.state = visibility;
+    resource.size = fstat.getLen();
+    resource.timestamp = fstat.getModificationTime();
+    return resource;
+  }
+  
+  private ContainerLaunchContext getContainer() {
+
+    ContainerLaunchContext container = new ContainerLaunchContext();
+    container.serviceData = new HashMap<CharSequence, ByteBuffer>();
+    container.resources = new HashMap<CharSequence, LocalResource>();
+    container.env = new HashMap<CharSequence, CharSequence>();
+    
+
+    try {
+      FileContext remoteFS = FileContext.getFileContext(conf);
+      
+      Path localizedJobConf = new Path(YARNApplicationConstants.JOB_CONF_FILE);
+      remoteTask.setJobFile(localizedJobConf.toString()); // Screwed!!!!!!
+      URL jobConfFileOnRemoteFS = AvroUtil.getYarnUrlFromPath(localizedJobConf);
+      LOG.info("The job-conf file on the remote FS is " + jobConfFileOnRemoteFS);
+      
+      Path jobJar = remoteFS.makeQualified(new Path(remoteTask.getConf().get(MRJobConfig.JAR)));
+      URL jobJarFileOnRemoteFS = AvroUtil.getYarnUrlFromPath(jobJar);
+      container.resources.put(YARNApplicationConstants.JOB_JAR,
+          getLocalResource(remoteFS, jobJar, 
+              LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      LOG.info("The job-jar file on the remote FS is " + jobJarFileOnRemoteFS);
+
+      Path jobSubmitDir =
+          new Path(conf.get(YARNApplicationConstants.APPS_STAGING_DIR_KEY),
+              oldJobId.toString());
+      Path jobTokenFile =
+          remoteFS.makeQualified(new Path(jobSubmitDir,
+              YarnConfiguration.APPLICATION_TOKENS_FILE));
+      URL applicationTokenFileOnRemoteFS =
+          AvroUtil.getYarnUrlFromPath(jobTokenFile);
+      // TODO: Looks like this is not needed. Revisit during localization
+      // cleanup.
+      //container.resources_todo.put(YarnConfiguration.APPLICATION_TOKENS_FILE,
+      //    getLocalResource(remoteFS, jobTokenFile, 
+      //        LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      container.resources.put(YARNApplicationConstants.JOB_CONF_FILE,
+          getLocalResource(remoteFS,
+            new Path(jobSubmitDir, YARNApplicationConstants.JOB_CONF_FILE),
+            LocalResourceType.FILE, LocalResourceVisibility.APPLICATION));
+      LOG.info("The application token file on the remote FS is "
+          + applicationTokenFileOnRemoteFS);
+
+      // Setup DistributedCache
+      setupDistributedCache(conf, container);
+
+      // Setup up tokens
+      Credentials taskCredentials = new Credentials();
+
+      if (UserGroupInformation.isSecurityEnabled()) {
+        // Add file-system tokens
+        for (Token<? extends TokenIdentifier> token : fsTokens) {
+          LOG.info("Putting fs-token for NM use for launching container : "
+              + token.getIdentifier().toString());
+          taskCredentials.addToken(token.getService(), token);
+        }
+      }
+
+      // LocalStorageToken is needed irrespective of whether security is enabled
+      // or not.
+      TokenCache.setJobToken(jobToken, taskCredentials);
+
+      DataOutputBuffer containerTokens_dob = new DataOutputBuffer();
+      LOG.info("Size of containertokens_dob is "
+          + taskCredentials.numberOfTokens());
+      taskCredentials.writeTokenStorageToStream(containerTokens_dob);
+      container.containerTokens =
+          ByteBuffer.wrap(containerTokens_dob.getData(), 0,
+              containerTokens_dob.getLength());
+
+      // Add shuffle token
+      LOG.info("Putting shuffle token in serviceData");
+      DataOutputBuffer jobToken_dob = new DataOutputBuffer();
+      jobToken.write(jobToken_dob);
+      // TODO: should depend on ShuffleHandler
+      container.serviceData.put("mapreduce.shuffle", 
+          ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength()));
+
+      MRApps.setInitialClasspath(container.env);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+    
+    container.id = containerID;
+    container.user = conf.get(MRJobConfig.USER_NAME); // TODO: Fix
+
+    File workDir = new File(ContainerBuilderHelper.getWorkDir());
+    String logDir = new File(workDir, "logs").toString();
+    String childTmpDir = new File(workDir, "tmp").toString();
+    String javaHome = "${JAVA_HOME}";
+    String nmLdLibraryPath =
+        ContainerBuilderHelper.getEnvVar("LD_LIBRARY_PATH>");
+    List<String> classPaths = new ArrayList<String>();
+
+    String localizedApplicationTokensFile =
+        new File(workDir, YarnConfiguration.APPLICATION_TOKENS_FILE)
+            .toString();
+    classPaths.add(YARNApplicationConstants.JOB_JAR);
+    classPaths.add(YARNApplicationConstants.YARN_MAPREDUCE_APP_JAR_PATH);
+    classPaths.add(workDir.toString()); // TODO
+
+    // Construct the actual Container
+    container.command =
+        MapReduceChildJVM.getVMCommand(taskAttemptListener.getAddress(),
+            remoteTask, javaHome, workDir.toString(), logDir, 
+            childTmpDir, jvmID);
+
+    MapReduceChildJVM.setVMEnv(container.env, classPaths, workDir.toString(),
+        nmLdLibraryPath, remoteTask, localizedApplicationTokensFile);
+
+    // Construct the actual Container
+    container.id = containerID;
+    container.user = conf.get(MRJobConfig.USER_NAME);
+    container.resource = resourceCapability;
+    return container;
+  }
+
+  private void setupDistributedCache(Configuration conf, 
+      ContainerLaunchContext container) throws IOException {
+    
+    // Cache archives
+    parseDistributedCacheArtifacts(container, LocalResourceType.ARCHIVE, 
+        DistributedCache.getCacheArchives(conf), 
+        DistributedCache.getArchiveTimestamps(conf), 
+        getFileSizes(conf, MRJobConfig.CACHE_ARCHIVES_SIZES), 
+        DistributedCache.getArchiveVisibilities(conf), 
+        DistributedCache.getArchiveClassPaths(conf));
+    
+    // Cache files
+    parseDistributedCacheArtifacts(container, LocalResourceType.FILE, 
+        DistributedCache.getCacheFiles(conf),
+        DistributedCache.getFileTimestamps(conf),
+        getFileSizes(conf, MRJobConfig.CACHE_FILES_SIZES),
+        DistributedCache.getFileVisibilities(conf),
+        DistributedCache.getFileClassPaths(conf));
+  }
+
+  // TODO - Move this to MR!
+  // Use TaskDistributedCacheManager.CacheFiles.makeCacheFiles(URI[], 
+  // long[], boolean[], Path[], FileType)
+  private static void parseDistributedCacheArtifacts(
+      ContainerLaunchContext container, LocalResourceType type,
+      URI[] uris, long[] timestamps, long[] sizes, boolean visibilities[], 
+      Path[] classpaths) throws IOException {
+
+    if (uris != null) {
+      // Sanity check
+      if ((uris.length != timestamps.length) || (uris.length != sizes.length) ||
+          (uris.length != visibilities.length)) {
+        throw new IllegalArgumentException("Invalid specification for " +
+        		"distributed-cache artifacts of type " + type + " :" +
+        		" #uris=" + uris.length +
+        		" #timestamps=" + timestamps.length +
+        		" #visibilities=" + visibilities.length
+        		);
+      }
+      
+      Map<String, Path> classPaths = new HashMap<String, Path>();
+      if (classpaths != null) {
+        for (Path p : classpaths) {
+          classPaths.put(p.toUri().getPath().toString(), p);
+        }
+      }
+      for (int i = 0; i < uris.length; ++i) {
+        URI u = uris[i];
+        Path p = new Path(u.toString());
+        // Add URI fragment or just the filename
+        Path name = new Path((null == u.getFragment())
+          ? p.getName()
+          : u.getFragment());
+        if (name.isAbsolute()) {
+          throw new IllegalArgumentException("Resource name must be relative");
+        }
+        container.resources.put(
+            name.toUri().getPath(),
+            BuilderUtils.newLocalResource(
+                uris[i], type, 
+                visibilities[i]
+                  ? LocalResourceVisibility.PUBLIC
+                  : LocalResourceVisibility.PRIVATE,
+                sizes[i], timestamps[i])
+        );
+        if (classPaths.containsKey(u.getPath())) {
+          addCacheArtifactToClassPath(container, name.toUri().getPath());
+        }
+      }
+    }
+  }
+
+  private static final String CLASSPATH = "CLASSPATH";
+  private static void addCacheArtifactToClassPath(
+      ContainerLaunchContext container, String fileName) {
+    CharSequence classpath = container.env.get(CLASSPATH);
+    if (classpath == null) {
+      classpath = fileName;
+    } else {
+      classpath = classpath + ":" + fileName;
+    }
+    container.env.put(CLASSPATH, classpath);
+  }
+  
+  // TODO - Move this to MR!
+  private static long[] getFileSizes(Configuration conf, String key) {
+    String[] strs = conf.getStrings(key);
+    if (strs == null) {
+      return null;
+    }
+    long[] result = new long[strs.length];
+    for(int i=0; i < strs.length; ++i) {
+      result[i] = Long.parseLong(strs[i]);
+    }
+    return result;
+  }
+  
+  @Override
+  public ContainerID getAssignedContainerID() {
+    readLock.lock();
+    try {
+      return containerID;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public long getLaunchTime() {
+    readLock.lock();
+    try {
+      return launchTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public long getFinishTime() {
+    readLock.lock();
+    try {
+      return finishTime;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**If container Assigned then return container mgr address, otherwise null.
+   */
+  @Override
+  public String getAssignedContainerMgrAddress() {
+    readLock.lock();
+    try {
+      return containerMgrAddress;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  protected abstract org.apache.hadoop.mapred.Task createRemoteTask();
+
+  protected abstract int getPriority();
+
+  @Override
+  public TaskAttemptID getID() {
+    return attemptId;
+  }
+
+  @Override
+  public boolean isFinished() {
+    readLock.lock();
+    try {
+      // TODO: Use stateMachine level method?
+      return (getState() == TaskAttemptState.SUCCEEDED || 
+          getState() == TaskAttemptState.FAILED ||
+          getState() == TaskAttemptState.KILLED);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public TaskAttemptReport getReport() {
+    TaskAttemptReport result = new TaskAttemptReport();
+    readLock.lock();
+    try {
+      result.id = attemptId;
+      //take the LOCAL state of attempt
+      //DO NOT take from reportedStatus
+      result.state = getState();
+      result.progress = reportedStatus.progress;
+      result.startTime = launchTime;
+      result.finishTime = finishTime;
+      result.diagnosticInfo = reportedStatus.diagnosticInfo;
+      result.phase = reportedStatus.phase;
+      result.stateString = reportedStatus.stateString;
+      result.counters = getCounters();
+      return result;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public List<CharSequence> getDiagnostics() {
+    List<CharSequence> result = new ArrayList<CharSequence>();
+    readLock.lock();
+    try {
+      result.addAll(diagnostics);
+      return result;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Counters getCounters() {
+    readLock.lock();
+    try {
+      Counters counters = reportedStatus.counters;
+      if (counters == null) {
+        counters = new Counters();
+        counters.groups = new HashMap<CharSequence, CounterGroup>();
+      }
+      return counters;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    readLock.lock();
+    try {
+      return reportedStatus.progress;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public TaskAttemptState getState() {
+    readLock.lock();
+    try {
+      return stateMachine.getCurrentState();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public void handle(TaskAttemptEvent event) {
+    LOG.info("Processing " + event.getTaskAttemptID() +
+        " of type " + event.getType());
+    writeLock.lock();
+    try {
+      final TaskAttemptState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        eventHandler.handle(new JobDiagnosticsUpdateEvent(
+            this.attemptId.taskID.jobID, "Invalid event " + event.getType() + 
+            " on TaskAttempt " + this.attemptId));
+        eventHandler.handle(new JobEvent(this.attemptId.taskID.jobID,
+            JobEventType.INTERNAL_ERROR));
+      }
+      if (oldState != getState()) {
+          LOG.info(attemptId + " TaskAttempt Transitioned from " 
+           + oldState + " to "
+           + getState());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  //always called in write lock
+  private void setFinishTime() {
+    //set the finish time only if launch time is set
+    if (launchTime != 0) {
+      finishTime = System.currentTimeMillis();
+    }
+  }
+
+  private static String[] racks = new String[] {NetworkTopology.DEFAULT_RACK};
+  private static class RequestContainerTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      // Tell any speculator that we're requesting a container
+      taskAttempt.eventHandler.handle
+          (new SpeculatorEvent(taskAttempt.getID().taskID, +1));
+      //request for container
+      taskAttempt.eventHandler.handle(
+          new ContainerRequestEvent(taskAttempt.attemptId, 
+              taskAttempt.resourceCapability, 
+              taskAttempt.getPriority(), taskAttempt.dataLocalHosts, racks));
+    }
+  }
+
+  private static class ContainerAssignedTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(final TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      TaskAttemptContainerAssignedEvent cEvent = 
+        (TaskAttemptContainerAssignedEvent) event;
+      taskAttempt.containerID = cEvent.getContainerID();
+      taskAttempt.containerMgrAddress = cEvent.getContainerManagerAddress();
+      taskAttempt.containerToken = cEvent.getContainerToken();
+      taskAttempt.remoteTask = taskAttempt.createRemoteTask();
+      taskAttempt.jvmID = new WrappedJvmID(
+          taskAttempt.remoteTask.getTaskID().getJobID(), 
+          taskAttempt.remoteTask.isMapTask(), taskAttempt.containerID.id);
+      
+      //launch the container
+      //create the container object to be launched for a given Task attempt
+      taskAttempt.eventHandler.handle(
+          new ContainerRemoteLaunchEvent(taskAttempt.attemptId, 
+              taskAttempt.containerID, 
+              taskAttempt.containerMgrAddress, taskAttempt.containerToken) {
+        @Override
+        public ContainerLaunchContext getContainer() {
+          return taskAttempt.getContainer();
+        }
+        @Override
+        public Task getRemoteTask() {
+          return taskAttempt.remoteTask;
+        }
+      });
+
+      // send event to speculator that our container needs are satisfied
+      taskAttempt.eventHandler.handle
+          (new SpeculatorEvent(taskAttempt.getID().taskID, -1));
+    }
+  }
+
+  private static class DeallocateContainerTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    private final TaskAttemptState finalState;
+    private final boolean withdrawsContainerRequest;
+    DeallocateContainerTransition
+        (TaskAttemptState finalState, boolean withdrawsContainerRequest) {
+      this.finalState = finalState;
+      this.withdrawsContainerRequest = withdrawsContainerRequest;
+    }
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      //set the finish time
+      taskAttempt.setFinishTime();
+      //send the deallocate event to ContainerAllocator
+      taskAttempt.eventHandler.handle(
+          new ContainerAllocatorEvent(taskAttempt.attemptId,
+          ContainerAllocator.EventType.CONTAINER_DEALLOCATE));
+
+      // send event to speculator that we withdraw our container needs, if
+      //  we're transitioning out of UNASSIGNED
+      if (withdrawsContainerRequest) {
+        taskAttempt.eventHandler.handle
+            (new SpeculatorEvent(taskAttempt.getID().taskID, -1));
+      }
+
+      switch(finalState) {
+        case FAILED:
+          taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+              taskAttempt.attemptId,
+              TaskEventType.T_ATTEMPT_FAILED));
+          break;
+        case KILLED:
+          taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+              taskAttempt.attemptId,
+              TaskEventType.T_ATTEMPT_KILLED));
+          break;
+      }
+    }
+  }
+
+  private static class LaunchedContainerTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      //set the launch time
+      taskAttempt.launchTime = System.currentTimeMillis();
+      // register it to TaskAttemptListener so that it start listening
+      // for it
+      taskAttempt.taskAttemptListener.register(
+          taskAttempt.attemptId, taskAttempt.remoteTask, taskAttempt.jvmID);
+      TaskAttemptStartedEvent tase =
+        new TaskAttemptStartedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+            TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+            taskAttempt.launchTime,
+            "tracker", 0);
+      taskAttempt.eventHandler.handle
+          (new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tase));
+      taskAttempt.eventHandler.handle
+          (new SpeculatorEvent
+              (taskAttempt.attemptId, true, System.currentTimeMillis()));
+      //make remoteTask reference as null as it is no more needed
+      //and free up the memory
+      taskAttempt.remoteTask = null;
+      
+      //tell the Task that attempt has started
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId, 
+         TaskEventType.T_ATTEMPT_LAUNCHED));
+    }
+  }
+   
+  private static class CommitPendingTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId, 
+         TaskEventType.T_ATTEMPT_COMMIT_PENDING));
+    }
+  }
+
+  private static class TaskCleanupTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      TaskAttemptContext taskContext =
+        new TaskAttemptContextImpl(taskAttempt.conf,
+            TypeConverter.fromYarn(taskAttempt.attemptId));
+      taskAttempt.eventHandler.handle(new TaskCleanupEvent(
+          taskAttempt.attemptId,
+          taskAttempt.committer,
+          taskContext));
+    }
+  }
+
+  private static class SucceededTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      //set the finish time
+      taskAttempt.setFinishTime();
+      String taskType = 
+          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType).toString();
+      LOG.info("In TaskAttemptImpl taskType: " + taskType);
+      if (taskType.equals("MAP")) {
+          MapAttemptFinishedEvent mfe =
+            new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+            TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+            TaskAttemptState.SUCCEEDED.toString(),
+            taskAttempt.finishTime,
+            taskAttempt.finishTime, "hostname",
+            TaskAttemptState.SUCCEEDED.toString(),
+            TypeConverter.fromYarn(taskAttempt.getCounters()),null);
+            taskAttempt.eventHandler.handle(
+              new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, mfe));
+      } else {
+          ReduceAttemptFinishedEvent rfe =
+            new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+            TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+            TaskAttemptState.SUCCEEDED.toString(),
+            taskAttempt.finishTime,
+            taskAttempt.finishTime,
+            taskAttempt.finishTime, "hostname",
+            TaskAttemptState.SUCCEEDED.toString(),
+            TypeConverter.fromYarn(taskAttempt.getCounters()),null);
+            taskAttempt.eventHandler.handle(
+              new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, rfe));
+      }
+          /*
+      TaskAttemptFinishedEvent tfe =
+          new TaskAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+          TaskAttemptState.SUCCEEDED.toString(), 
+          taskAttempt.reportedStatus.finishTime, "hostname", 
+          TaskAttemptState.SUCCEEDED.toString(), 
+          TypeConverter.fromYarn(taskAttempt.getCounters()));
+      taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tfe));
+      */
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId,
+          TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+   }
+  }
+
+  private static class FailedTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      //set the finish time
+      taskAttempt.setFinishTime();
+      TaskAttemptUnsuccessfulCompletionEvent ta =
+          new TaskAttemptUnsuccessfulCompletionEvent(
+          TypeConverter.fromYarn(taskAttempt.attemptId),
+          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+          TaskAttemptState.FAILED.toString(),
+          taskAttempt.finishTime,
+          "hostname",
+          taskAttempt.reportedStatus.diagnosticInfo.toString());
+      taskAttempt.eventHandler.handle(
+          new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, ta));
+      if (taskAttempt.attemptId.taskID.taskType == TaskType.MAP) {
+        MapAttemptFinishedEvent mfe =
+           new MapAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+           TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+           TaskAttemptState.FAILED.toString(),
+           taskAttempt.finishTime,
+           taskAttempt.finishTime, "hostname",
+           TaskAttemptState.FAILED.toString(),
+           TypeConverter.fromYarn(taskAttempt.getCounters()),null);
+           taskAttempt.eventHandler.handle(
+             new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, mfe));
+      } else {
+         ReduceAttemptFinishedEvent rfe =
+           new ReduceAttemptFinishedEvent(TypeConverter.fromYarn(taskAttempt.attemptId),
+           TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+           TaskAttemptState.FAILED.toString(),
+           taskAttempt.finishTime,
+           taskAttempt.finishTime,
+           taskAttempt.finishTime, "hostname",
+           TaskAttemptState.FAILED.toString(),
+           TypeConverter.fromYarn(taskAttempt.getCounters()),null);
+           taskAttempt.eventHandler.handle(
+             new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, rfe));
+      }
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId,
+          TaskEventType.T_ATTEMPT_FAILED));
+    }
+  }
+
+  private static class TooManyFetchFailureTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+      //add to diagnostic
+      taskAttempt.addDiagnosticInfo("Too Many fetch failures.Failing the attempt");
+      //set the finish time
+      taskAttempt.setFinishTime();
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+    }
+  }
+
+  private static class KilledTransition implements
+      SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt,
+        TaskAttemptEvent event) {
+      //set the finish time
+      taskAttempt.setFinishTime();
+      TaskAttemptUnsuccessfulCompletionEvent tke =
+          new TaskAttemptUnsuccessfulCompletionEvent(
+          TypeConverter.fromYarn(taskAttempt.attemptId),
+          TypeConverter.fromYarn(taskAttempt.attemptId.taskID.taskType),
+          TaskAttemptState.KILLED.toString(),
+          taskAttempt.finishTime,
+          TaskAttemptState.KILLED.toString(),
+          taskAttempt.reportedStatus.diagnosticInfo.toString());
+      taskAttempt.eventHandler.handle(
+          new JobHistoryEvent(taskAttempt.attemptId.taskID.jobID, tke));
+      taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+          taskAttempt.attemptId,
+          TaskEventType.T_ATTEMPT_KILLED));
+    }
+  }
+
+  private static class CleanupContainerTransition implements
+       SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      // unregister it to TaskAttemptListener so that it stops listening
+      // for it
+      taskAttempt.taskAttemptListener.unregister(
+          taskAttempt.attemptId, taskAttempt.jvmID);
+      //send the cleanup event to containerLauncher
+      taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+          taskAttempt.attemptId, 
+          taskAttempt.containerID, taskAttempt.containerMgrAddress,
+          taskAttempt.containerToken,
+          ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+    }
+  }
+
+  private void addDiagnosticInfo(CharSequence diag) {
+    if (diag != null && !diag.equals("")) {
+      diagnostics.add(diag);
+    }
+  }
+
+  private static class StatusUpdater 
+       implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      // Status update calls don't really change the state of the attempt.
+      TaskAttemptStatus newReportedStatus =
+          ((TaskAttemptStatusUpdateEvent) event)
+              .getReportedTaskAttemptStatus();
+      // Now switch the information in the reportedStatus
+      taskAttempt.reportedStatus = newReportedStatus;
+
+      // send event to speculator about the reported status
+      taskAttempt.eventHandler.handle
+          (new SpeculatorEvent
+              (taskAttempt.reportedStatus, System.currentTimeMillis()));
+      
+      //add to diagnostic
+      taskAttempt.addDiagnosticInfo(newReportedStatus.diagnosticInfo);
+      
+      //if fetch failures are present, send the fetch failure event to job
+      //this only will happen in reduce attempt type
+      if (taskAttempt.reportedStatus.fetchFailedMaps != null && 
+          taskAttempt.reportedStatus.fetchFailedMaps.size() > 0) {
+        taskAttempt.eventHandler.handle(new JobTaskAttemptFetchFailureEvent(
+            taskAttempt.attemptId, taskAttempt.reportedStatus.fetchFailedMaps));
+      }
+    }
+  }
+
+  private static class DiagnosticInformationUpdater 
+        implements SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+    @Override
+    public void transition(TaskAttemptImpl taskAttempt, 
+        TaskAttemptEvent event) {
+      TaskAttemptDiagnosticsUpdateEvent diagEvent =
+          (TaskAttemptDiagnosticsUpdateEvent) event;
+      LOG.info("Diagnostics report from " + taskAttempt.attemptId + ": "
+          + diagEvent.getDiagnosticInfo());
+      taskAttempt.addDiagnosticInfo(diagEvent.getDiagnosticInfo());
+    }
+  }
+
+  private void initTaskAttemptStatus(TaskAttemptStatus result) {
+    result.progress = new Float(0);
+    result.diagnosticInfo = new String("");
+    result.phase = Phase.STARTING;
+    result.stateString = new String("NEW");
+    Counters counters = new Counters();
+    counters.groups = new HashMap<CharSequence, CounterGroup>();
+    result.counters = counters;
+  }
+
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,743 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.job.impl;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
+import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobMapTaskRescheduledEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.lib.TypeConverter;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
+import org.apache.hadoop.yarn.state.MultipleArcTransition;
+import org.apache.hadoop.yarn.state.SingleArcTransition;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.mapreduce.v2.api.CounterGroup;
+import org.apache.hadoop.mapreduce.v2.api.Counters;
+import org.apache.hadoop.mapreduce.v2.api.JobID;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+import org.apache.hadoop.mapreduce.v2.api.TaskID;
+import org.apache.hadoop.mapreduce.v2.api.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.TaskType;
+
+/**
+ * Implementation of Task interface.
+ */
+public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
+
+  private static final Log LOG = LogFactory.getLog(TaskImpl.class);
+
+  protected final Configuration conf;
+  protected final Path jobFile;
+  protected final OutputCommitter committer;
+  protected final int partition;
+  protected final TaskAttemptListener taskAttemptListener;
+  protected final EventHandler eventHandler;
+  private final TaskID taskId;
+  private Map<TaskAttemptID, TaskAttempt> attempts;
+  private final int maxAttempts;
+  private final Lock readLock;
+  private final Lock writeLock;
+  protected Collection<Token<? extends TokenIdentifier>> fsTokens;
+  protected Token<JobTokenIdentifier> jobToken;
+  
+  // counts the number of attempts that are either running or in a state where
+  //  they will come to be running when they get a Container
+  private int numberUncompletedAttempts = 0;
+
+  private static final SingleArcTransition<TaskImpl, TaskEvent> 
+     ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
+  private static final SingleArcTransition<TaskImpl, TaskEvent> 
+     KILL_TRANSITION = new KillTransition();
+
+  private static final StateMachineFactory
+               <TaskImpl, TaskState, TaskEventType, TaskEvent> 
+            stateMachineFactory 
+           = new StateMachineFactory<TaskImpl, TaskState, TaskEventType, TaskEvent>
+               (TaskState.NEW)
+
+    // define the state machine of Task
+
+    // Transitions from NEW state
+    .addTransition(TaskState.NEW, TaskState.SCHEDULED, 
+        TaskEventType.T_SCHEDULE, new InitialScheduleTransition())
+    .addTransition(TaskState.NEW, TaskState.KILLED, 
+        TaskEventType.T_KILL, new KillNewTransition())
+
+    // Transitions from SCHEDULED state
+      //when the first attempt is launched, the task state is set to RUNNING
+     .addTransition(TaskState.SCHEDULED, TaskState.RUNNING, 
+         TaskEventType.T_ATTEMPT_LAUNCHED)
+     .addTransition(TaskState.SCHEDULED, TaskState.KILL_WAIT, 
+         TaskEventType.T_KILL, KILL_TRANSITION)
+     .addTransition(TaskState.SCHEDULED, TaskState.SCHEDULED, 
+         TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
+     .addTransition(TaskState.SCHEDULED, 
+        EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), 
+        TaskEventType.T_ATTEMPT_FAILED, 
+        new AttemptFailedTransition())
+ 
+    // Transitions from RUNNING state
+    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+        TaskEventType.T_ATTEMPT_LAUNCHED) //more attempts may start later
+    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+        TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+        new AttemptCommitPendingTransition())
+    .addTransition(TaskState.RUNNING, TaskState.RUNNING,
+        TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition())
+    .addTransition(TaskState.RUNNING, TaskState.SUCCEEDED, 
+        TaskEventType.T_ATTEMPT_SUCCEEDED,
+        new AttemptSucceededTransition())
+    .addTransition(TaskState.RUNNING, TaskState.RUNNING, 
+        TaskEventType.T_ATTEMPT_KILLED,
+        ATTEMPT_KILLED_TRANSITION)
+    .addTransition(TaskState.RUNNING, 
+        EnumSet.of(TaskState.RUNNING, TaskState.FAILED), 
+        TaskEventType.T_ATTEMPT_FAILED,
+        new AttemptFailedTransition())
+    .addTransition(TaskState.RUNNING, TaskState.KILL_WAIT, 
+        TaskEventType.T_KILL, KILL_TRANSITION)
+
+    // Transitions from KILL_WAIT state
+    .addTransition(TaskState.KILL_WAIT,
+        EnumSet.of(TaskState.KILL_WAIT, TaskState.KILLED),
+        TaskEventType.T_ATTEMPT_KILLED,
+        new KillWaitAttemptKilledTransition())
+    // Ignore-able transitions.
+    .addTransition(
+        TaskState.KILL_WAIT,
+        TaskState.KILL_WAIT,
+        EnumSet.of(TaskEventType.T_KILL,
+            TaskEventType.T_ATTEMPT_LAUNCHED,
+            TaskEventType.T_ATTEMPT_COMMIT_PENDING,
+            TaskEventType.T_ATTEMPT_FAILED,
+            TaskEventType.T_ATTEMPT_SUCCEEDED,
+            TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+    // Transitions from SUCCEEDED state
+    .addTransition(TaskState.SUCCEEDED, //only possible for map tasks
+        EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED),
+        TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition())
+    // Ignore-able transitions.
+    .addTransition(
+        TaskState.SUCCEEDED, TaskState.SUCCEEDED,
+        EnumSet.of(TaskEventType.T_KILL,
+            TaskEventType.T_ADD_SPEC_ATTEMPT,
+            TaskEventType.T_ATTEMPT_LAUNCHED))
+
+    // Transitions from FAILED state        
+    .addTransition(TaskState.FAILED, TaskState.FAILED,
+        EnumSet.of(TaskEventType.T_KILL,
+                   TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+    // Transitions from KILLED state
+    .addTransition(TaskState.KILLED, TaskState.KILLED,
+        EnumSet.of(TaskEventType.T_KILL,
+                   TaskEventType.T_ADD_SPEC_ATTEMPT))
+
+    // create the topology tables
+    .installTopology();
+
+  private final StateMachine<TaskState, TaskEventType, TaskEvent>
+    stateMachine;
+  
+  protected int nextAttemptNumber;
+
+  //should be set to one which comes first
+  //saying COMMIT_PENDING
+  private TaskAttemptID commitAttempt;
+
+  private TaskAttemptID successfulAttempt;
+
+  private int failedAttempts;
+  private int finishedAttempts;//finish are total of success, failed and killed
+
+  @Override
+  public TaskState getState() {
+    return stateMachine.getCurrentState();
+  }
+
+  public TaskImpl(JobID jobId, TaskType taskType, int partition,
+      EventHandler eventHandler, Path remoteJobConfFile, Configuration conf,
+      TaskAttemptListener taskAttemptListener, OutputCommitter committer,
+      Token<JobTokenIdentifier> jobToken,
+      Collection<Token<? extends TokenIdentifier>> fsTokens) {
+    this.conf = conf;
+    this.jobFile = remoteJobConfFile;
+    ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    readLock = readWriteLock.readLock();
+    writeLock = readWriteLock.writeLock();
+    this.attempts = Collections.emptyMap();
+    // This overridable method call is okay in a constructor because we
+    //  have a convention that none of the overrides depends on any
+    //  fields that need initialization.
+    maxAttempts = getMaxAttempts();
+    taskId = new TaskID();
+    taskId.jobID = jobId;
+    taskId.id = partition;
+    taskId.taskType = taskType;
+    this.partition = partition;
+    this.taskAttemptListener = taskAttemptListener;
+    this.eventHandler = eventHandler;
+    this.committer = committer;
+    this.fsTokens = fsTokens;
+    this.jobToken = jobToken;
+
+    // This "this leak" is okay because the retained pointer is in an
+    //  instance variable.
+    stateMachine = stateMachineFactory.make(this);
+  }
+
+  @Override
+  public Map<TaskAttemptID, TaskAttempt> getAttempts() {
+    readLock.lock();
+
+    try {
+      if (attempts.size() <= 1) {
+        return attempts;
+      }
+      
+      Map<TaskAttemptID, TaskAttempt> result
+          = new LinkedHashMap<TaskAttemptID, TaskAttempt>();
+      result.putAll(attempts);
+
+      return result;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public TaskAttempt getAttempt(TaskAttemptID attemptID) {
+    readLock.lock();
+    try {
+      return attempts.get(attemptID);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public TaskID getID() {
+    return taskId;
+  }
+
+  @Override
+  public boolean isFinished() {
+    readLock.lock();
+    try {
+     // TODO: Use stateMachine level method?
+      return (getState() == TaskState.SUCCEEDED ||
+          getState() == TaskState.FAILED ||
+          getState() == TaskState.KILLED);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public TaskReport getReport() {
+    TaskReport report = new TaskReport();
+    readLock.lock();
+    try {
+      report.id = taskId;
+      report.startTime = getLaunchTime();
+      report.finishTime = getFinishTime();
+      report.state = getState();
+      report.progress = getProgress();
+      report.counters = getCounters();
+      report.runningAttempts = new ArrayList<TaskAttemptID>();
+      report.runningAttempts.addAll(attempts.keySet());
+      report.successfulAttempt = successfulAttempt;
+      
+      report.diagnostics = new ArrayList<CharSequence>();
+      for (TaskAttempt att : attempts.values()) {
+        String prefix = "AttemptID:" + att.getID() + " Info:";
+        for (CharSequence cs : att.getDiagnostics()) {
+          report.diagnostics.add(prefix + cs);
+        }
+      }
+      return report;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public Counters getCounters() {
+    Counters counters = null;
+    readLock.lock();
+    try {
+      TaskAttempt bestAttempt = selectBestAttempt();
+      if (bestAttempt != null) {
+        counters = bestAttempt.getCounters();
+      } else {
+        counters = new Counters();
+        counters.groups = new HashMap<CharSequence, CounterGroup>();
+      }
+      return counters;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @Override
+  public float getProgress() {
+    readLock.lock();
+    try {
+      TaskAttempt bestAttempt = selectBestAttempt();
+      if (bestAttempt == null) {
+        return 0;
+      }
+      return bestAttempt.getProgress();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  //this is always called in read/write lock
+  private long getLaunchTime() {
+    long launchTime = 0;
+    for (TaskAttempt at : attempts.values()) {
+      //select the least launch time of all attempts
+      if (launchTime == 0  || launchTime > at.getLaunchTime()) {
+        launchTime = at.getLaunchTime();
+      }
+    }
+    return launchTime;
+  }
+
+  //this is always called in read/write lock
+  private long getFinishTime() {
+    if (!isFinished()) {
+      return 0;
+    }
+    long finishTime = 0;
+    for (TaskAttempt at : attempts.values()) {
+      //select the max finish time of all attempts
+      if (finishTime < at.getFinishTime()) {
+        finishTime = at.getFinishTime();
+      }
+    }
+    return finishTime;
+  }
+
+  //select the nextAttemptNumber with best progress
+  // always called inside the Read Lock
+  private TaskAttempt selectBestAttempt() {
+    float progress = 0f;
+    TaskAttempt result = null;
+    for (TaskAttempt at : attempts.values()) {
+      if (result == null) {
+        result = at; //The first time around
+      }
+      //TODO: consider the nextAttemptNumber only if it is not failed/killed ?
+      // calculate the best progress
+      if (at.getProgress() > progress) {
+        result = at;
+        progress = at.getProgress();
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public boolean canCommit(TaskAttemptID taskAttemptID) {
+    readLock.lock();
+    boolean canCommit = false;
+    try {
+      if (commitAttempt != null) {
+        canCommit = taskAttemptID.equals(commitAttempt);
+        LOG.info("Result of canCommit for " + taskAttemptID + ":" + canCommit);
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return canCommit;
+  }
+
+  protected abstract TaskAttemptImpl createAttempt();
+
+  // No override of this method may require that the subclass be initialized.
+  protected abstract int getMaxAttempts();
+
+  protected TaskAttempt getSuccessfulAttempt() {
+    readLock.lock();
+    try {
+      if (null == successfulAttempt) {
+        return null;
+      }
+      return attempts.get(successfulAttempt);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  // This is always called in the Write Lock
+  private void addAndScheduleAttempt() {
+    TaskAttempt attempt = createAttempt();
+    LOG.info("Created attempt " + attempt.getID());
+    switch (attempts.size()) {
+      case 0:
+        attempts = Collections.singletonMap(attempt.getID(), attempt);
+        break;
+        
+      case 1:
+        Map newAttempts
+            = new LinkedHashMap<TaskAttemptID, TaskAttempt>(maxAttempts);
+        newAttempts.putAll(attempts);
+        attempts = newAttempts;
+        attempts.put(attempt.getID(), attempt);
+        break;
+
+      default:
+        attempts.put(attempt.getID(), attempt);
+        break;
+    }
+    ++nextAttemptNumber;
+    ++numberUncompletedAttempts;
+    //schedule the nextAttemptNumber
+    eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
+        TaskAttemptEventType.TA_SCHEDULE));
+  }
+
+  @Override
+  public void handle(TaskEvent event) {
+    LOG.info("Processing " + event.getTaskID() + " of type " + event.getType());
+    try {
+      writeLock.lock();
+      TaskState oldState = getState();
+      try {
+        stateMachine.doTransition(event.getType(), event);
+      } catch (InvalidStateTransitonException e) {
+        LOG.error("Can't handle this event at current state", e);
+        internalError(event.getType());
+      }
+      if (oldState != getState()) {
+        LOG.info(taskId + " Task Transitioned from " + oldState + " to "
+            + getState());
+      }
+
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void internalError(TaskEventType type) {
+    eventHandler.handle(new JobDiagnosticsUpdateEvent(
+        this.taskId.jobID, "Invalid event " + type + 
+        " on Task " + this.taskId));
+    eventHandler.handle(new JobEvent(this.taskId.jobID,
+        JobEventType.INTERNAL_ERROR));
+  }
+
+  // always called inside a transition, in turn inside the Write Lock
+  private void handleTaskAttemptCompletion(TaskAttemptID attemptId,
+      TaskAttemptCompletionEventStatus status) {
+    finishedAttempts++;
+    TaskAttempt attempt = attempts.get(attemptId);
+    //raise the completion event only if the container is assigned
+    // to nextAttemptNumber
+    if (attempt.getAssignedContainerMgrAddress() != null) {
+      TaskAttemptCompletionEvent tce = new TaskAttemptCompletionEvent();
+      tce.eventId = -1;
+      //TODO: XXXXXX  hardcoded port
+      tce.mapOutputServerAddress =
+        "http://" + attempt.getAssignedContainerMgrAddress().split(":")[0]
+                                                              + ":8080";
+      tce.status = status;
+      tce.attemptId = attempt.getID();
+      tce.attemptRunTime = 0; // TODO: set the exact run time of the task.
+      
+      //raise the event to job so that it adds the completion event to its
+      //data structures
+      eventHandler.handle(new JobTaskAttemptCompletedEvent(tce));
+    }
+  }
+
+  private static class InitialScheduleTransition
+    implements SingleArcTransition<TaskImpl, TaskEvent> {
+
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      TaskStartedEvent tse = new TaskStartedEvent(TypeConverter
+          .fromYarn(task.taskId), task.getLaunchTime(), TypeConverter
+          .fromYarn(task.taskId.taskType), TaskState.RUNNING.toString());
+      task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tse));
+
+      task.addAndScheduleAttempt();
+    }
+  }
+
+  // Used when creating a new attempt while one is already running.
+  //  Currently we do this for speculation.  In the future we may do this
+  //  for tasks that failed in a way that might indicate application code
+  //  problems, so we can take later failures in parallel and flush the
+  //  job quickly when this happens.
+  private static class RedundantScheduleTransition
+    implements SingleArcTransition<TaskImpl, TaskEvent> {
+
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      LOG.info("Scheduling a redundant attempt for task " + task.taskId);
+      task.addAndScheduleAttempt();
+    }
+  }
+
+  private static class AttemptCommitPendingTransition 
+          implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
+      // The nextAttemptNumber is commit pending, decide on set the commitAttempt
+      TaskAttemptID attemptID = ev.getTaskAttemptID();
+      if (task.commitAttempt == null) {
+        // TODO: validate attemptID
+        task.commitAttempt = attemptID;
+        LOG.info(attemptID + " given a go for committing the task output.");
+      } else {
+        // Don't think this can be a pluggable decision, so simply raise an
+        // event for the TaskAttempt to delete its output.
+        LOG.info(task.commitAttempt
+            + " already given a go for committing the task output, so killing "
+            + attemptID);
+        task.eventHandler.handle(new TaskAttemptEvent(
+            attemptID, TaskAttemptEventType.TA_KILL));
+      }
+    }
+  }
+
+  private static class AttemptSucceededTransition 
+      implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      task.handleTaskAttemptCompletion(
+          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+          TaskAttemptCompletionEventStatus.SUCCEEDED);
+      --task.numberUncompletedAttempts;
+      task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
+      task.eventHandler.handle(new JobTaskEvent(
+          task.taskId, TaskState.SUCCEEDED));
+      LOG.info("Task succeeded with attempt " + task.successfulAttempt);
+      // issue kill to all other attempts
+      TaskFinishedEvent tfe =
+          new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
+            task.getFinishTime(),
+            TypeConverter.fromYarn(task.taskId.taskType),
+            TaskState.SUCCEEDED.toString(),
+            TypeConverter.fromYarn(task.getCounters()));
+      task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfe));
+      for (TaskAttempt attempt : task.attempts.values()) {
+        if (attempt.getID() != task.successfulAttempt &&
+            // This is okay because it can only talk us out of sending a
+            //  TA_KILL message to an attempt that doesn't need one for
+            //  other reasons.
+            !attempt.isFinished()) {
+          LOG.info("Issuing kill to other attempt " + attempt.getID());
+          task.eventHandler.handle(
+              new TaskAttemptEvent(attempt.getID(), 
+                  TaskAttemptEventType.TA_KILL));
+        }
+      }
+    }
+  }
+
+  private static class AttemptKilledTransition implements
+      SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      task.handleTaskAttemptCompletion(
+          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+          TaskAttemptCompletionEventStatus.KILLED);
+      --task.numberUncompletedAttempts;
+      if (task.successfulAttempt == null) {
+        task.addAndScheduleAttempt();
+      }
+    }
+  }
+
+
+  private static class KillWaitAttemptKilledTransition implements
+      MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+
+    protected TaskState finalState = TaskState.KILLED;
+
+    @Override
+    public TaskState transition(TaskImpl task, TaskEvent event) {
+      task.handleTaskAttemptCompletion(
+          ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+          TaskAttemptCompletionEventStatus.KILLED);
+      // check whether all attempts are finished
+      if (task.finishedAttempts == task.attempts.size()) {
+        task.eventHandler.handle(
+            new JobTaskEvent(task.taskId, finalState));
+        return finalState;
+      }
+      return task.getState();
+    }
+  }
+
+  private static class AttemptFailedTransition implements
+    MultipleArcTransition<TaskImpl, TaskEvent, TaskState> {
+
+    @Override
+    public TaskState transition(TaskImpl task, TaskEvent event) {
+      task.failedAttempts++;
+      TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+      if (task.failedAttempts < task.maxAttempts) {
+        task.handleTaskAttemptCompletion(
+            ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+            TaskAttemptCompletionEventStatus.FAILED);
+        // we don't need a new event if we already have a spare
+        if (--task.numberUncompletedAttempts == 0
+            && task.successfulAttempt == null) {
+          task.addAndScheduleAttempt();
+        }
+      } else {
+        task.handleTaskAttemptCompletion(
+            ((TaskTAttemptEvent) event).getTaskAttemptID(), 
+            TaskAttemptCompletionEventStatus.TIPFAILED);
+        TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
+        TaskFinishedEvent tfi =
+            new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
+                task.getFinishTime(),
+            TypeConverter.fromYarn(task.taskId.taskType),
+                TaskState.FAILED.toString(),
+                TypeConverter.fromYarn(task.getCounters()));
+        task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfi));
+        task.eventHandler.handle(
+            new JobTaskEvent(task.taskId, TaskState.FAILED));
+        return TaskState.FAILED;
+      }
+      return getDefaultState(task);
+    }
+
+    protected TaskState getDefaultState(Task task) {
+      return task.getState();
+    }
+
+    protected void unSucceed(TaskImpl task) {
+      ++task.numberUncompletedAttempts;
+      task.successfulAttempt = null;
+    }
+  }
+
+  private static class MapRetroactiveFailureTransition
+      extends AttemptFailedTransition {
+
+    @Override
+    public TaskState transition(TaskImpl task, TaskEvent event) {
+      //verify that this occurs only for map task
+      //TODO: consider moving it to MapTaskImpl
+      if (!TaskType.MAP.equals(task.getType())) {
+        LOG.error("Unexpected event for REDUCE task " + event.getType());
+        task.internalError(event.getType());
+      }
+      
+      // tell the job about the rescheduling
+      task.eventHandler.handle(
+          new JobMapTaskRescheduledEvent(task.taskId));
+      // super.transition is mostly coded for the case where an
+      //  UNcompleted task failed.  When a COMPLETED task retroactively
+      //  fails, we have to let AttemptFailedTransition.transition
+      //  believe that there's no redundancy.
+      unSucceed(task);
+      return super.transition(task, event);
+    }
+
+    @Override
+    protected TaskState getDefaultState(Task task) {
+      return TaskState.SCHEDULED;
+    }
+  }
+
+  private static class KillNewTransition 
+    implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      TaskFinishedEvent tfe =
+          new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId),
+              task.getFinishTime(), TypeConverter.fromYarn(task.taskId.taskType),
+              TaskState.KILLED.toString(), TypeConverter.fromYarn(task
+                  .getCounters()));
+      task.eventHandler.handle(new JobHistoryEvent(task.taskId.jobID, tfe));
+      task.eventHandler.handle(
+          new JobTaskEvent(task.taskId, TaskState.KILLED));
+    }
+  }
+
+  private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+    if (attempt != null && !attempt.isFinished()) {
+      eventHandler.handle(
+          new TaskAttemptEvent(attempt.getID(),
+              TaskAttemptEventType.TA_KILL));
+    }
+  }
+
+  private static class KillTransition 
+    implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      // issue kill to all non finished attempts
+      for (TaskAttempt attempt : task.attempts.values()) {
+        task.killUnfinishedAttempt
+            (attempt, "Task KILL is received. Killing attempt!");
+      }
+
+      task.numberUncompletedAttempts = 0;
+    }
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,31 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+public interface ContainerLauncher 
+    extends EventHandler<ContainerLauncherEvent> {
+
+  enum EventType {
+    CONTAINER_REMOTE_LAUNCH,
+    CONTAINER_REMOTE_CLEANUP
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java?rev=1082677&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java (added)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java Thu Mar 17 20:21:13 2011
@@ -0,0 +1,66 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app.launcher;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.ContainerID;
+import org.apache.hadoop.yarn.ContainerToken;
+import org.apache.hadoop.mapreduce.v2.api.TaskAttemptID;
+
+public class ContainerLauncherEvent 
+    extends AbstractEvent<ContainerLauncher.EventType> {
+
+  private TaskAttemptID taskAttemptID;
+  private ContainerID containerID;
+  private String containerMgrAddress;
+  private ContainerToken containerToken;
+
+  public ContainerLauncherEvent(TaskAttemptID taskAttemptID, 
+      ContainerID containerID,
+      String containerMgrAddress,
+      ContainerToken containerToken,
+      ContainerLauncher.EventType type) {
+    super(type);
+    this.taskAttemptID = taskAttemptID;
+    this.containerID = containerID;
+    this.containerMgrAddress = containerMgrAddress;
+    this.containerToken = containerToken;
+  }
+
+  public TaskAttemptID getTaskAttemptID() {
+    return this.taskAttemptID;
+  }
+
+  public ContainerID getContainerID() {
+    return containerID;
+  }
+
+  public String getContainerMgrAddress() {
+    return containerMgrAddress;
+  }
+
+  public ContainerToken getContainerToken() {
+    return containerToken;
+  }
+
+  @Override
+  public String toString() {
+    return super.toString() + " for taskAttempt " + taskAttemptID;
+  }
+}



Mime
View raw message