hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [9/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/sr...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/RecoveryService.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,447 @@
+///**
+//* Licensed to the Apache Software Foundation (ASF) under one
+//* or more contributor license agreements.  See the NOTICE file
+//* distributed with this work for additional information
+//* regarding copyright ownership.  The ASF licenses this file
+//* to you under the Apache License, Version 2.0 (the
+//* "License"); you may not use this file except in compliance
+//* with the License.  You may obtain a copy of the License at
+//*
+//*     http://www.apache.org/licenses/LICENSE-2.0
+//*
+//* Unless required by applicable law or agreed to in writing, software
+//* distributed under the License is distributed on an "AS IS" BASIS,
+//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//* See the License for the specific language governing permissions and
+//* limitations under the License.
+//*/
+//
+//package org.apache.hadoop.mapreduce.v2.app2.recover;
+//
+//import java.io.IOException;
+//import java.util.ArrayList;
+//import java.util.HashMap;
+//import java.util.LinkedList;
+//import java.util.List;
+//import java.util.Map;
+//
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.fs.FSDataInputStream;
+//import org.apache.hadoop.fs.FileContext;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.mapreduce.MRJobConfig;
+//import org.apache.hadoop.mapreduce.OutputCommitter;
+//import org.apache.hadoop.mapreduce.TaskAttemptContext;
+//import org.apache.hadoop.mapreduce.TaskAttemptID;
+//import org.apache.hadoop.mapreduce.TaskType;
+//import org.apache.hadoop.mapreduce.TypeConverter;
+//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
+//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
+//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskAttemptInfo;
+//import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo;
+//import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+//import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+//import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+//import org.apache.hadoop.mapreduce.v2.app2.ControlledClock;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobDiagnosticsUpdateEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerLaunchedEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskTAttemptEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerRemoteLaunchEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocatorEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+//import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
+//import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+//import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+//import org.apache.hadoop.yarn.Clock;
+//import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+//import org.apache.hadoop.yarn.api.records.Container;
+//import org.apache.hadoop.yarn.api.records.ContainerId;
+//import org.apache.hadoop.yarn.api.records.NodeId;
+//import org.apache.hadoop.yarn.event.AsyncDispatcher;
+//import org.apache.hadoop.yarn.event.Dispatcher;
+//import org.apache.hadoop.yarn.event.Event;
+//import org.apache.hadoop.yarn.event.EventHandler;
+//import org.apache.hadoop.yarn.service.CompositeService;
+//import org.apache.hadoop.yarn.service.Service;
+//import org.apache.hadoop.yarn.util.BuilderUtils;
+//import org.apache.hadoop.yarn.util.ConverterUtils;
+//
+///*
+// * Recovers the completed tasks from the previous life of Application Master.
+// * The completed tasks are deciphered from the history file of the previous life.
+// * Recovery service intercepts and replay the events for completed tasks.
+// * While recovery is in progress, the scheduling of new tasks are delayed by 
+// * buffering the task schedule events.
+// * The recovery service controls the clock while recovery is in progress.
+// */
+//
+////TODO:
+////task cleanup for all non completed tasks
+//public class RecoveryService extends CompositeService implements Recovery {
+//
+//  private static final Log LOG = LogFactory.getLog(RecoveryService.class);
+//
+//  private final ApplicationAttemptId applicationAttemptId;
+//  private final OutputCommitter committer;
+//  private final Dispatcher dispatcher;
+//  private final ControlledClock clock;
+//
+//  private JobInfo jobInfo = null;
+//  private final Map<TaskId, TaskInfo> completedTasks =
+//    new HashMap<TaskId, TaskInfo>();
+//
+//  private final List<TaskEvent> pendingTaskScheduleEvents =
+//    new ArrayList<TaskEvent>();
+//
+//  private volatile boolean recoveryMode = false;
+//
+//  public RecoveryService(ApplicationAttemptId applicationAttemptId, 
+//      Clock clock, OutputCommitter committer) {
+//    super("RecoveringDispatcher");
+//    this.applicationAttemptId = applicationAttemptId;
+//    this.committer = committer;
+//    this.dispatcher = createRecoveryDispatcher();
+//    this.clock = new ControlledClock(clock);
+//      addService((Service) dispatcher);
+//  }
+//
+//  @Override
+//  public void init(Configuration conf) {
+//    super.init(conf);
+//    // parse the history file
+//    try {
+//      parse();
+//    } catch (Exception e) {
+//      LOG.warn(e);
+//      LOG.warn("Could not parse the old history file. Aborting recovery. "
+//          + "Starting afresh.", e);
+//    }
+//    if (completedTasks.size() > 0) {
+//      recoveryMode = true;
+//      LOG.info("SETTING THE RECOVERY MODE TO TRUE. NO OF COMPLETED TASKS "
+//          + "TO RECOVER " + completedTasks.size());
+//      LOG.info("Job launch time " + jobInfo.getLaunchTime());
+//      clock.setTime(jobInfo.getLaunchTime());
+//    }
+//  }
+//
+//  @Override
+//  public Dispatcher getDispatcher() {
+//    return dispatcher;
+//  }
+//
+//  @Override
+//  public Clock getClock() {
+//    return clock;
+//  }
+//
+//  @Override
+//  public Map<TaskId, TaskInfo> getCompletedTasks() {
+//    return completedTasks;
+//  }
+//
+//  @Override
+//  public List<AMInfo> getAMInfos() {
+//    if (jobInfo == null || jobInfo.getAMInfos() == null) {
+//      return new LinkedList<AMInfo>();
+//    }
+//    List<AMInfo> amInfos = new LinkedList<AMInfo>();
+//    for (org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.AMInfo jhAmInfo : jobInfo
+//        .getAMInfos()) {
+//      AMInfo amInfo =
+//          MRBuilderUtils.newAMInfo(jhAmInfo.getAppAttemptId(),
+//              jhAmInfo.getStartTime(), jhAmInfo.getContainerId(),
+//              jhAmInfo.getNodeManagerHost(), jhAmInfo.getNodeManagerPort(),
+//              jhAmInfo.getNodeManagerHttpPort());
+//
+//      amInfos.add(amInfo);
+//    }
+//    return amInfos;
+//  }
+//
+//  private void parse() throws IOException {
+//    // TODO: parse history file based on startCount
+//    String jobName = 
+//        TypeConverter.fromYarn(applicationAttemptId.getApplicationId()).toString();
+//    String jobhistoryDir = JobHistoryUtils.getConfiguredHistoryStagingDirPrefix(getConfig());
+//    FSDataInputStream in = null;
+//    Path historyFile = null;
+//    Path histDirPath = FileContext.getFileContext(getConfig()).makeQualified(
+//        new Path(jobhistoryDir));
+//    FileContext fc = FileContext.getFileContext(histDirPath.toUri(),
+//        getConfig());
+//    //read the previous history file
+//    historyFile = fc.makeQualified(JobHistoryUtils.getStagingJobHistoryFile(
+//        histDirPath, jobName, (applicationAttemptId.getAttemptId() - 1)));  
+//    LOG.info("History file is at " + historyFile);
+//    in = fc.open(historyFile);
+//    JobHistoryParser parser = new JobHistoryParser(in);
+//    jobInfo = parser.parse();
+//    Exception parseException = parser.getParseException();
+//    if (parseException != null) {
+//      LOG.info("Got an error parsing job-history file " + historyFile + 
+//          ", ignoring incomplete events.", parseException);
+//    }
+//    Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
+//        .getAllTasks();
+//    for (TaskInfo taskInfo : taskInfos.values()) {
+//      if (TaskState.SUCCEEDED.toString().equals(taskInfo.getTaskStatus())) {
+//        completedTasks
+//            .put(TypeConverter.toYarn(taskInfo.getTaskId()), taskInfo);
+//        LOG.info("Read from history task "
+//            + TypeConverter.toYarn(taskInfo.getTaskId()));
+//      }
+//    }
+//    LOG.info("Read completed tasks from history "
+//        + completedTasks.size());
+//  }
+//  
+//  protected Dispatcher createRecoveryDispatcher() {
+//    return new RecoveryDispatcher();
+//  }
+//
+//  @SuppressWarnings("rawtypes")
+//  class RecoveryDispatcher extends AsyncDispatcher {
+//    private final EventHandler actualHandler;
+//    private final EventHandler handler;
+//
+//    RecoveryDispatcher() {
+//      super();
+//      actualHandler = super.getEventHandler();
+//      handler = new InterceptingEventHandler(actualHandler);
+//    }
+//
+//    @Override
+//    @SuppressWarnings("unchecked")
+//    public void dispatch(Event event) {
+//      if (recoveryMode) {
+//        if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) {
+//          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+//              .getTaskAttemptID());
+//          LOG.info("Recovered Attempt start time " + attInfo.getStartTime());
+//          clock.setTime(attInfo.getStartTime());
+//
+//        } else if (event.getType() == TaskAttemptEventType.TA_DONE
+//            || event.getType() == TaskAttemptEventType.TA_FAILMSG
+//            || event.getType() == TaskAttemptEventType.TA_KILL) {
+//          TaskAttemptInfo attInfo = getTaskAttemptInfo(((TaskAttemptEvent) event)
+//              .getTaskAttemptID());
+//          LOG.info("Recovered Attempt finish time " + attInfo.getFinishTime());
+//          clock.setTime(attInfo.getFinishTime());
+//        }
+//
+//        else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED
+//            || event.getType() == TaskEventType.T_ATTEMPT_KILLED
+//            || event.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED) {
+//          TaskTAttemptEvent tEvent = (TaskTAttemptEvent) event;
+//          LOG.info("Recovered Task attempt " + tEvent.getTaskAttemptID());
+//          TaskInfo taskInfo = completedTasks.get(tEvent.getTaskAttemptID()
+//              .getTaskId());
+//          taskInfo.getAllTaskAttempts().remove(
+//              TypeConverter.fromYarn(tEvent.getTaskAttemptID()));
+//          // remove the task info from completed tasks if all attempts are
+//          // recovered
+//          if (taskInfo.getAllTaskAttempts().size() == 0) {
+//            completedTasks.remove(tEvent.getTaskAttemptID().getTaskId());
+//            // checkForRecoveryComplete
+//            LOG.info("CompletedTasks() " + completedTasks.size());
+//            if (completedTasks.size() == 0) {
+//              recoveryMode = false;
+//              clock.reset();
+//              LOG.info("Setting the recovery mode to false. " +
+//                 "Recovery is complete!");
+//
+//              // send all pending tasks schedule events
+//              for (TaskEvent tEv : pendingTaskScheduleEvents) {
+//                actualHandler.handle(tEv);
+//              }
+//
+//            }
+//          }
+//        }
+//      }
+//      realDispatch(event);
+//    }
+//    
+//    public void realDispatch(Event event) {
+//      super.dispatch(event);
+//    }
+//
+//    @Override
+//    public EventHandler getEventHandler() {
+//      return handler;
+//    }
+//  }
+//
+//  private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) {
+//    TaskInfo taskInfo = completedTasks.get(id.getTaskId());
+//    return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id));
+//  }
+//
+//  @SuppressWarnings({"rawtypes", "unchecked"})
+//  private class InterceptingEventHandler implements EventHandler {
+//    EventHandler actualHandler;
+//
+//    InterceptingEventHandler(EventHandler actualHandler) {
+//      this.actualHandler = actualHandler;
+//    }
+//
+//    @Override
+//    public void handle(Event event) {
+//      if (!recoveryMode) {
+//        // delegate to the dispatcher one
+//        actualHandler.handle(event);
+//        return;
+//      }
+//
+//      else if (event.getType() == TaskEventType.T_SCHEDULE) {
+//        TaskEvent taskEvent = (TaskEvent) event;
+//        // delay the scheduling of new tasks till previous ones are recovered
+//        if (completedTasks.get(taskEvent.getTaskID()) == null) {
+//          LOG.debug("Adding to pending task events "
+//              + taskEvent.getTaskID());
+//          pendingTaskScheduleEvents.add(taskEvent);
+//          return;
+//        }
+//      }
+//
+//      else if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
+//        TaskAttemptId aId = ((ContainerAllocatorEvent) event).getAttemptID();
+//        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+//        LOG.debug("CONTAINER_REQ " + aId);
+//        sendAssignedEvent(aId, attInfo);
+//        return;
+//      }
+//
+//      else if (event.getType() == TaskCleaner.EventType.TASK_CLEAN) {
+//        TaskAttemptId aId = ((TaskCleanupEvent) event).getAttemptID();
+//        LOG.debug("TASK_CLEAN");
+//        actualHandler.handle(new TaskAttemptEvent(aId,
+//            TaskAttemptEventType.TA_CLEANUP_DONE));
+//        return;
+//      }
+//
+//      else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH) {
+//        TaskAttemptId aId = ((ContainerRemoteLaunchEvent) event)
+//            .getTaskAttemptID();
+//        TaskAttemptInfo attInfo = getTaskAttemptInfo(aId);
+//        actualHandler.handle(new TaskAttemptContainerLaunchedEvent(aId,
+//            attInfo.getShufflePort()));
+//        // send the status update event
+//        sendStatusUpdateEvent(aId, attInfo);
+//
+//        TaskAttemptState state = TaskAttemptState.valueOf(attInfo.getTaskStatus());
+//        switch (state) {
+//        case SUCCEEDED:
+//          //recover the task output
+//          TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
+//              attInfo.getAttemptId());
+//          try { 
+//            TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
+//            int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1); 
+//            if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
+//              committer.recoverTask(taskContext);
+//              LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
+//            } else {
+//              LOG.info("Will not try to recover output for "
+//                  + taskContext.getTaskAttemptID());
+//            }
+//          } catch (IOException e) {
+//            LOG.error("Caught an exception while trying to recover task "+aId, e);
+//            actualHandler.handle(new JobDiagnosticsUpdateEvent(
+//                aId.getTaskId().getJobId(), "Error in recovering task output " + 
+//                e.getMessage()));
+//            actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
+//                JobEventType.INTERNAL_ERROR));
+//          }
+//          
+//          // send the done event
+//          LOG.info("Sending done event to recovered attempt " + aId);
+//          actualHandler.handle(new TaskAttemptEvent(aId,
+//              TaskAttemptEventType.TA_DONE));
+//          break;
+//        case KILLED:
+//          LOG.info("Sending kill event to recovered attempt " + aId);
+//          actualHandler.handle(new TaskAttemptEvent(aId,
+//              TaskAttemptEventType.TA_KILL));
+//          break;
+//        default:
+//          LOG.info("Sending fail event to recovered attempt " + aId);
+//          actualHandler.handle(new TaskAttemptEvent(aId,
+//              TaskAttemptEventType.TA_FAILMSG));
+//          break;
+//        }
+//        return;
+//      }
+//
+//      else if (event.getType() == 
+//        ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP) {
+//        TaskAttemptId aId = ((ContainerLauncherEvent) event)
+//          .getTaskAttemptID();
+//        actualHandler.handle(
+//           new TaskAttemptEvent(aId,
+//                TaskAttemptEventType.TA_CONTAINER_CLEANED));
+//        return;
+//      }
+//
+//      // delegate to the actual handler
+//      actualHandler.handle(event);
+//    }
+//
+//    private void sendStatusUpdateEvent(TaskAttemptId yarnAttemptID,
+//        TaskAttemptInfo attemptInfo) {
+//      LOG.info("Sending status update event to " + yarnAttemptID);
+//      TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+//      taskAttemptStatus.id = yarnAttemptID;
+//      taskAttemptStatus.progress = 1.0f;
+//      taskAttemptStatus.stateString = attemptInfo.getTaskStatus(); 
+//      // taskAttemptStatus.outputSize = attemptInfo.getOutputSize();
+//      taskAttemptStatus.phase = Phase.CLEANUP;
+//      org.apache.hadoop.mapreduce.Counters cntrs = attemptInfo.getCounters();
+//      if (cntrs == null) {
+//        taskAttemptStatus.counters = null;
+//      } else {
+//        taskAttemptStatus.counters = cntrs;
+//      }
+//      actualHandler.handle(new TaskAttemptStatusUpdateEvent(
+//          taskAttemptStatus.id, taskAttemptStatus));
+//    }
+//
+//    private void sendAssignedEvent(TaskAttemptId yarnAttemptID,
+//        TaskAttemptInfo attemptInfo) {
+//      LOG.info("Sending assigned event to " + yarnAttemptID);
+//      ContainerId cId = attemptInfo.getContainerId();
+//
+//      NodeId nodeId =
+//          ConverterUtils.toNodeId(attemptInfo.getHostname() + ":"
+//              + attemptInfo.getPort());
+//      // Resource/Priority/ApplicationACLs are only needed while launching the
+//      // container on an NM, these are already completed tasks, so setting them
+//      // to null
+//      Container container = BuilderUtils.newContainer(cId, nodeId,
+//          attemptInfo.getTrackerName() + ":" + attemptInfo.getHttpPort(),
+//          null, null, null);
+//      actualHandler.handle(new TaskAttemptContainerAssignedEvent(yarnAttemptID,
+//          container, null));
+//    }
+//  }
+//
+//}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/recover/package-info.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.recover;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,12 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+
+public class AMSchedulerEvent extends AbstractEvent<AMSchedulerEventType> {
+
+  // TODO Not a very useful class...
+  public AMSchedulerEvent(AMSchedulerEventType type) {
+    super(type);
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContaienrCompleted.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContaienrCompleted.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContaienrCompleted.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContaienrCompleted.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,19 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContaienrCompleted extends AMSchedulerEvent {
+
+  private final ContainerId containerId;
+  
+  public AMSchedulerEventContaienrCompleted(ContainerId containerId) {
+    super(AMSchedulerEventType.S_CONTAINER_COMPLETED);;
+    this.containerId = containerId;
+    // TODO Auto-generated constructor stub
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventContainersAllocated.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,26 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class AMSchedulerEventContainersAllocated extends AMSchedulerEvent {
+
+  private final List<ContainerId> containerIds;
+  private final boolean headRoomChanged;
+
+  public AMSchedulerEventContainersAllocated(List<ContainerId> containerIds,
+      boolean headRoomChanged) {
+    super(AMSchedulerEventType.S_CONTAINERS_ALLOCATED);
+    this.containerIds = containerIds;
+    this.headRoomChanged = headRoomChanged;
+  }
+
+  public List<ContainerId> getContainerIds() {
+    return this.containerIds;
+  }
+
+  public boolean didHeadroomChange() {
+    return headRoomChanged;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerEventType.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,26 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+public enum AMSchedulerEventType {
+  //Producer: TaskAttempt
+  S_TA_LAUNCH_REQUEST,
+  S_TA_STOP_REQUEST, // Maybe renamed to S_TA_END / S_TA_ABNORMAL_END
+  S_TA_SUCCEEDED,
+  S_TA_ENDED,
+  
+  //Producer: RMCommunicator
+  S_CONTAINERS_ALLOCATED,
+  
+  //Producer: Container. (Maybe RMCommunicator)
+  S_CONTAINER_COMPLETED,
+  
+  // Add events for nodes being blacklisted.
+  
+  // TODO XXX
+  //Producer: RMCommunicator. May not be needed.
+//  S_CONTAINER_COMPLETED,
+  
+  //Producer: RMComm
+//  S_NODE_UNHEALTHY,
+//  S_NODE_HEALTHY,
+  
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTALaunchRequestEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,104 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class AMSchedulerTALaunchRequestEvent extends AMSchedulerEvent {
+
+  // TODO XXX: Get rid of remoteTask from here. Can be forgottent after it has been assigned.
+  //.... Maybe have the Container talk to the TaskAttempt to pull in the remote task.
+  
+  private final TaskAttemptId attemptId;
+  private final boolean rescheduled;
+  private final Resource capability;
+  private final org.apache.hadoop.mapred.Task remoteTask;
+  private final TaskAttempt taskAttempt;
+  private final Credentials credentials;
+  private Token<JobTokenIdentifier> jobToken;
+  private final String[] hosts;
+  private final String[] racks;
+  
+  
+  public AMSchedulerTALaunchRequestEvent(TaskAttemptId attemptId,
+      boolean rescheduled, Resource capability,
+      org.apache.hadoop.mapred.Task remoteTask, TaskAttempt ta,
+      Credentials credentials, Token<JobTokenIdentifier> jobToken,
+      String[] hosts, String[] racks) {
+    super(AMSchedulerEventType.S_TA_LAUNCH_REQUEST);
+    this.attemptId = attemptId;
+    this.rescheduled = rescheduled;
+    this.capability = capability;
+    this.remoteTask = remoteTask;
+    this.taskAttempt = ta;
+    this.credentials = credentials;
+    this.jobToken = jobToken;
+    this.hosts = hosts;
+    this.racks = racks;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return this.attemptId;
+  }
+
+  public Resource getCapability() {
+    return capability;
+  }
+
+  public String[] getHosts() {
+    return hosts;
+  }
+  
+  public String[] getRacks() {
+    return racks;
+  }
+  
+  public boolean isRescheduled() {
+    return rescheduled;
+  }
+  
+  public org.apache.hadoop.mapred.Task getRemoteTask() {
+    return remoteTask;
+  }
+  
+  public TaskAttempt getTaskAttempt() {
+    return this.taskAttempt;
+  }
+  
+  public Credentials getCredentials() {
+    return this.credentials;
+  }
+  
+  public Token<JobTokenIdentifier> getJobToken() {
+    return this.jobToken;
+  }
+ 
+  //TODO Passing taskAttempt for now. Required params from the call to createContainerLaunchContext
+  /*
+  ContainerLaunchContext clc = TaskAttemptImplHelpers.createContainerLaunchContext(null, null, ta.conf, ta.jobToken, ta.remoteTask, ta.oldJobId, null, null, ta.taskAttemptListener, ta.credentials)
+      //TODO WTF are the AppACLs coming from
+      // containerId not present.
+      // assignedCapability versus requested capability.
+      // jvmId isn't known at this point. Dependent on the containerId.
+      // TaskAttemptListener may not be required at this point.
+  */
+
+  //XXX jvmId is required in the launch context - to construct the command line.
+  //XXX Parameter replacement: @taskid@ will not be usable
+  //XXX ProfileTaskRange not available along with ContainerReUse
+  //XXX TaskAttemptId - firstTaskId used in YarnChild to setup JvmId, metrics etc.
+  // For now, will differentiating based on Map/Reduce task.
+  
+  /*Requirements to determine a container request.
+   * + Data-local + Rack-local hosts.
+   * + Resource capability
+   * + Env - mapreduce.map.env / mapreduce.reduce.env can change. M/R log level. 
+   * - JobConf and JobJar file - same location.
+   * - Distributed Cache - identical for map / reduce tasks at the moment.
+   * - Credentials, tokens etc are identical.
+   * + Command - dependent on map / reduce java.opts
+   */
+}
\ No newline at end of file

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTAStopRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTAStopRequestEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTAStopRequestEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTAStopRequestEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,29 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class AMSchedulerTAStopRequestEvent extends AMSchedulerEvent {
+
+  // TODO XXX: Maybe include the ContainerId along with this -> for TOO_MANY_FETCH_FAILURES.
+  private final TaskAttemptId attemptId;
+  private final boolean failed;
+
+  public AMSchedulerTAStopRequestEvent(TaskAttemptId attemptId, boolean failed) {
+    super(AMSchedulerEventType.S_TA_STOP_REQUEST);
+    this.attemptId = attemptId;
+    this.failed = failed;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return this.attemptId;
+  }
+
+  // TODO XXX: Rename
+  public boolean failed() {
+    return failed;
+  }
+
+  public boolean killed() {
+    return !failed;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTASucceededEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTASucceededEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTASucceededEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/AMSchedulerTASucceededEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,18 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+
+public class AMSchedulerTASucceededEvent extends AMSchedulerEvent {
+
+  private final TaskAttemptId attemptId;
+
+  public AMSchedulerTASucceededEvent(TaskAttemptId attemptId) {
+    super(AMSchedulerEventType.S_TA_SUCCEEDED);
+    this.attemptId = attemptId;
+  }
+
+  public TaskAttemptId getAttemptID() {
+    return this.attemptId;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/ContainerAllocator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.EventHandler;
+
+// TODO XXX Rename to AMScheduler.
+public interface ContainerAllocator extends EventHandler<AMSchedulerEvent>{
+
+//  enum EventType {
+//
+//    CONTAINER_REQ,
+//    CONTAINER_DEALLOCATE,
+//    CONTAINER_FAILED
+//  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,77 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NMCommunicatorEvent extends AbstractEvent<NMCommunicatorEventType> {
+
+  private final ContainerId containerId;
+  private final NodeId nodeId;
+  private final ContainerToken containerToken;
+
+  public NMCommunicatorEvent(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken, NMCommunicatorEventType type) {
+    super(type);
+    this.containerId = containerId;
+    this.nodeId = nodeId;
+    this.containerToken = containerToken;
+  }
+
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+  public NodeId getNodeId() {
+    return this.nodeId;
+  }
+
+  public ContainerToken getContainerToken() {
+    return this.containerToken;
+  }
+  
+  public String toSrting() {
+    return super.toString() + " for container " + containerId + ", nodeId: "
+        + nodeId;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((containerId == null) ? 0 : containerId.hashCode());
+    result = prime * result
+        + ((containerToken == null) ? 0 : containerToken.hashCode());
+    result = prime * result + ((nodeId == null) ? 0 : nodeId.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    NMCommunicatorEvent other = (NMCommunicatorEvent) obj;
+    if (containerId == null) {
+      if (other.containerId != null)
+        return false;
+    } else if (!containerId.equals(other.containerId))
+      return false;
+    if (containerToken == null) {
+      if (other.containerToken != null)
+        return false;
+    } else if (!containerToken.equals(other.containerToken))
+      return false;
+    if (nodeId == null) {
+      if (other.nodeId != null)
+        return false;
+    } else if (!nodeId.equals(other.nodeId))
+      return false;
+    return true;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorEventType.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,7 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+// TODO - Re-use the events in ContainerLauncher..
+public enum NMCommunicatorEventType {
+  CONTAINER_LAUNCH_REQUEST,
+  CONTAINER_STOP_REQUEST
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorLaunchRequestEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,21 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+
+public class NMCommunicatorLaunchRequestEvent extends NMCommunicatorEvent {
+
+  private final ContainerLaunchContext clc;
+
+  public NMCommunicatorLaunchRequestEvent(ContainerLaunchContext clc,
+      Container container) {
+    super(clc.getContainerId(), container.getNodeId(), container
+        .getContainerToken(), NMCommunicatorEventType.CONTAINER_LAUNCH_REQUEST);
+    this.clc = clc;
+  }
+
+  public ContainerLaunchContext getContainerLaunchContext() {
+    return this.clc;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/NMCommunicatorStopRequestEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,15 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class NMCommunicatorStopRequestEvent extends NMCommunicatorEvent {
+
+  public NMCommunicatorStopRequestEvent(ContainerId containerId, NodeId nodeId,
+      ContainerToken containerToken) {
+    super(containerId, nodeId, containerToken,
+        NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicator.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,296 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.AMRMProtocol;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Registers/unregisters to RM and sends heartbeats to RM.
+ */
+public abstract class RMCommunicator extends AbstractService  {
+  private static final Log LOG = LogFactory.getLog(RMContainerAllocator.class);
+  private int rmPollInterval;//millis
+  protected ApplicationId applicationId;
+  protected ApplicationAttemptId applicationAttemptId;
+  private AtomicBoolean stopped;
+  protected Thread allocatorThread;
+  @SuppressWarnings("rawtypes")
+  protected EventHandler eventHandler;
+  protected AMRMProtocol scheduler;
+  private final ClientService clientService;
+  protected int lastResponseID;
+  private Resource minContainerCapability;
+  private Resource maxContainerCapability;
+  protected Map<ApplicationAccessType, String> applicationACLs;
+
+  protected final AppContext context;
+  private Job job;
+  // Has a signal (SIGTERM etc) been issued?
+  protected volatile boolean isSignalled = false;
+
+  public RMCommunicator(ClientService clientService, AppContext context) {
+    super("RMCommunicator");
+    this.clientService = clientService;
+    this.context = context;
+    this.eventHandler = context.getEventHandler();
+    this.applicationId = context.getApplicationID();
+    this.applicationAttemptId = context.getApplicationAttemptId();
+    this.stopped = new AtomicBoolean(false);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    rmPollInterval =
+        conf.getInt(MRJobConfig.MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS,
+            MRJobConfig.DEFAULT_MR_AM_TO_RM_HEARTBEAT_INTERVAL_MS);
+  }
+
+  @Override
+  public void start() {
+    scheduler= createSchedulerProxy();
+    register();
+    startAllocatorThread();
+    JobID id = TypeConverter.fromYarn(this.applicationId);
+    JobId jobId = TypeConverter.toYarn(id);
+    job = context.getJob(jobId);
+    super.start();
+  }
+
+  protected AppContext getContext() {
+    return context;
+  }
+
+  protected Job getJob() {
+    return job;
+  }
+  
+  public Map<ApplicationAccessType, String> getApplicationAcls() {
+    return this.applicationACLs;
+  }
+
+  /**
+   * Get the appProgress. Can be used only after this component is started.
+   * @return the appProgress.
+   */
+  protected float getApplicationProgress() {
+    // For now just a single job. In future when we have a DAG, we need an
+    // aggregate progress.
+    return this.job.getProgress();
+  }
+
+  // TODO XXX: Get rid of the dependencies on the ClientService.
+  protected void register() {
+    //Register
+    InetSocketAddress serviceAddr = clientService.getBindAddress();
+    try {
+      RegisterApplicationMasterRequest request = Records
+          .newRecord(RegisterApplicationMasterRequest.class);
+      request.setApplicationAttemptId(applicationAttemptId);
+      request.setHost(serviceAddr.getHostName());
+      request.setRpcPort(serviceAddr.getPort());
+      request.setTrackingUrl(serviceAddr.getHostName() + ":" + clientService.getHttpPort());
+      RegisterApplicationMasterResponse response =
+        scheduler.registerApplicationMaster(request);
+      minContainerCapability = response.getMinimumResourceCapability();
+      maxContainerCapability = response.getMaximumResourceCapability();
+      this.context.getClusterInfo().setMinContainerCapability(
+          minContainerCapability);
+      this.context.getClusterInfo().setMaxContainerCapability(
+          maxContainerCapability);
+      this.applicationACLs = response.getApplicationACLs();
+      LOG.info("minContainerCapability: " + minContainerCapability.getMemory());
+      LOG.info("maxContainerCapability: " + maxContainerCapability.getMemory());
+    } catch (Exception are) {
+      LOG.error("Exception while registering", are);
+      throw new YarnException(are);
+    }
+  }
+
+  protected void unregister() {
+    try {
+      FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
+      if (job.getState() == JobState.SUCCEEDED) {
+        finishState = FinalApplicationStatus.SUCCEEDED;
+      } else if (job.getState() == JobState.KILLED
+          || (job.getState() == JobState.RUNNING && isSignalled)) {
+        finishState = FinalApplicationStatus.KILLED;
+      } else if (job.getState() == JobState.FAILED
+          || job.getState() == JobState.ERROR) {
+        finishState = FinalApplicationStatus.FAILED;
+      }
+      StringBuffer sb = new StringBuffer();
+      for (String s : job.getDiagnostics()) {
+        sb.append(s).append("\n");
+      }
+      LOG.info("Setting job diagnostics to " + sb.toString());
+
+      String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
+          context.getApplicationID());
+      LOG.info("History url is " + historyUrl);
+
+      FinishApplicationMasterRequest request = Records
+          .newRecord(FinishApplicationMasterRequest.class);
+      request.setAppAttemptId(this.applicationAttemptId);
+      request.setFinishApplicationStatus(finishState);
+      request.setDiagnostics(sb.toString());
+      request.setTrackingUrl(historyUrl);
+      scheduler.finishApplicationMaster(request);
+    } catch(Exception are) {
+      LOG.error("Exception while unregistering ", are);
+    }
+  }
+
+  protected Resource getMinContainerCapability() {
+    return minContainerCapability;
+  }
+
+  protected Resource getMaxContainerCapability() {
+    return maxContainerCapability;
+  }
+
+  @Override
+  public void stop() {
+    if (stopped.getAndSet(true)) {
+      // return if already stopped
+      return;
+    }
+    allocatorThread.interrupt();
+    try {
+      allocatorThread.join();
+    } catch (InterruptedException ie) {
+      LOG.warn("InterruptedException while stopping", ie);
+    }
+    unregister();
+    super.stop();
+  }
+
+  protected void startAllocatorThread() {
+    allocatorThread = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        while (!stopped.get() && !Thread.currentThread().isInterrupted()) {
+          try {
+            Thread.sleep(rmPollInterval);
+            try {
+              heartbeat();
+            } catch (YarnException e) {
+              LOG.error("Error communicating with RM: " + e.getMessage() , e);
+              return;
+            } catch (Exception e) {
+              LOG.error("ERROR IN CONTACTING RM. ", e);
+              // TODO: for other exceptions
+            }
+          } catch (InterruptedException e) {
+            LOG.warn("Allocated thread interrupted. Returning.");
+            return;
+          }
+        }
+      }
+    });
+    allocatorThread.setName("RMCommunicator");
+    allocatorThread.start();
+  }
+
+  protected AMRMProtocol createSchedulerProxy() {
+    final Configuration conf = getConfig();
+    final YarnRPC rpc = YarnRPC.create(conf);
+    final InetSocketAddress serviceAddr = conf.getSocketAddr(
+        YarnConfiguration.RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT);
+
+    UserGroupInformation currentUser;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      String tokenURLEncodedStr = System.getenv().get(
+          ApplicationConstants.APPLICATION_MASTER_TOKEN_ENV_NAME);
+      Token<? extends TokenIdentifier> token = new Token<TokenIdentifier>();
+
+      try {
+        token.decodeFromUrlString(tokenURLEncodedStr);
+      } catch (IOException e) {
+        throw new YarnException(e);
+      }
+
+      SecurityUtil.setTokenService(token, serviceAddr);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("AppMasterToken is " + token);
+      }
+      currentUser.addToken(token);
+    }
+
+    return currentUser.doAs(new PrivilegedAction<AMRMProtocol>() {
+      @Override
+      public AMRMProtocol run() {
+        return (AMRMProtocol) rpc.getProxy(AMRMProtocol.class,
+            serviceAddr, conf);
+      }
+    });
+  }
+
+  protected abstract void heartbeat() throws Exception;
+
+  public void setSignalled(boolean isSignalled) {
+    this.isSignalled = isSignalled;
+    LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorContainerDeAllocateRequestEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,19 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+
+public class RMCommunicatorContainerDeAllocateRequestEvent extends
+    RMCommunicatorEvent {
+
+  private final ContainerId containerId;
+  
+  public RMCommunicatorContainerDeAllocateRequestEvent(ContainerId containerId) {
+    super(RMCommunicatorEventType.CONTAINER_DEALLOCATE);
+    this.containerId = containerId;
+  }
+  
+  public ContainerId getContainerId() {
+    return this.containerId;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEvent.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,11 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class RMCommunicatorEvent extends AbstractEvent<RMCommunicatorEventType> {
+
+  public RMCommunicatorEvent(RMCommunicatorEventType type) {
+    super(type);
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/rm/RMCommunicatorEventType.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,9 @@
+package org.apache.hadoop.mapreduce.v2.app2.rm;
+
+public enum RMCommunicatorEventType {
+  // TODO Essentialy the same as ContainerAllocator.
+  // TODO XXX: Clean this up. CONTAINER_REQ and CONTAINER_FAILED not used.
+  CONTAINER_REQ,
+  CONTAINER_DEALLOCATE,
+  CONTAINER_FAILED
+}



Mime
View raw message