hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [16/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/s...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MockJobs.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,625 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobACLsManager;
+import org.apache.hadoop.mapred.ShuffleHandler;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.FileSystemCounter;
+import org.apache.hadoop.mapreduce.JobACL;
+import org.apache.hadoop.mapreduce.JobCounter;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.TaskCounter;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.AMInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.Phase;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.MockApps;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class MockJobs extends MockApps {
+  static final Iterator<JobState> JOB_STATES = Iterators.cycle(JobState
+      .values());
+  static final Iterator<TaskState> TASK_STATES = Iterators.cycle(TaskState
+      .values());
+  static final Iterator<TaskAttemptState> TASK_ATTEMPT_STATES = Iterators
+      .cycle(TaskAttemptState.values());
+  static final Iterator<TaskType> TASK_TYPES = Iterators.cycle(TaskType
+      .values());
+  static final Iterator<JobCounter> JOB_COUNTERS = Iterators.cycle(JobCounter
+      .values());
+  static final Iterator<FileSystemCounter> FS_COUNTERS = Iterators
+      .cycle(FileSystemCounter.values());
+  static final Iterator<TaskCounter> TASK_COUNTERS = Iterators
+      .cycle(TaskCounter.values());
+  static final Iterator<String> FS_SCHEMES = Iterators.cycle("FILE", "HDFS",
+      "LAFS", "CEPH");
+  static final Iterator<String> USER_COUNTER_GROUPS = Iterators
+      .cycle(
+          "com.company.project.subproject.component.subcomponent.UserDefinedSpecificSpecialTask$Counters",
+          "PigCounters");
+  static final Iterator<String> USER_COUNTERS = Iterators.cycle("counter1",
+      "counter2", "counter3");
+  static final Iterator<Phase> PHASES = Iterators.cycle(Phase.values());
+  static final Iterator<String> DIAGS = Iterators.cycle(
+      "Error: java.lang.OutOfMemoryError: Java heap space",
+      "Lost task tracker: tasktracker.domain/127.0.0.1:40879");
+
+  public static final String NM_HOST = "localhost";
+  public static final int NM_PORT = 1234;
+  public static final int NM_HTTP_PORT = 8042;
+
+  static final int DT = 1000000; // ms
+
+  public static String newJobName() {
+    return newAppName();
+  }
+
+  /**
+   * Create numJobs in a map with jobs having appId==jobId
+   */
+  public static Map<JobId, Job> newJobs(int numJobs, int numTasksPerJob,
+      int numAttemptsPerTask) {
+    Map<JobId, Job> map = Maps.newHashMap();
+    for (int j = 0; j < numJobs; ++j) {
+      ApplicationId appID = MockJobs.newAppID(j);
+      Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask);
+      map.put(job.getID(), job);
+    }
+    return map;
+  }
+  
+  public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
+      int numTasksPerJob, int numAttemptsPerTask) {
+    Map<JobId, Job> map = Maps.newHashMap();
+    for (int j = 0; j < numJobsPerApp; ++j) {
+      Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask);
+      map.put(job.getID(), job);
+    }
+    return map;
+  }
+  
+  public static Map<JobId, Job> newJobs(ApplicationId appID, int numJobsPerApp,
+      int numTasksPerJob, int numAttemptsPerTask, boolean hasFailedTasks) {
+    Map<JobId, Job> map = Maps.newHashMap();
+    for (int j = 0; j < numJobsPerApp; ++j) {
+      Job job = newJob(appID, j, numTasksPerJob, numAttemptsPerTask, null,
+          hasFailedTasks);
+      map.put(job.getID(), job);
+    }
+    return map;
+  }
+
+  public static JobId newJobID(ApplicationId appID, int i) {
+    JobId id = Records.newRecord(JobId.class);
+    id.setAppId(appID);
+    id.setId(i);
+    return id;
+  }
+
+  public static JobReport newJobReport(JobId id) {
+    JobReport report = Records.newRecord(JobReport.class);
+    report.setJobId(id);
+    report
+        .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+    report.setFinishTime(System.currentTimeMillis()
+        + (int) (Math.random() * DT) + 1);
+    report.setMapProgress((float) Math.random());
+    report.setReduceProgress((float) Math.random());
+    report.setJobState(JOB_STATES.next());
+    return report;
+  }
+
+  public static TaskReport newTaskReport(TaskId id) {
+    TaskReport report = Records.newRecord(TaskReport.class);
+    report.setTaskId(id);
+    report
+        .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+    report.setFinishTime(System.currentTimeMillis()
+        + (int) (Math.random() * DT) + 1);
+    report.setProgress((float) Math.random());
+    report.setCounters(TypeConverter.toYarn(newCounters()));
+    report.setTaskState(TASK_STATES.next());
+    return report;
+  }
+
+  public static TaskAttemptReport newTaskAttemptReport(TaskAttemptId id) {
+    TaskAttemptReport report = Records.newRecord(TaskAttemptReport.class);
+    report.setTaskAttemptId(id);
+    report
+        .setStartTime(System.currentTimeMillis() - (int) (Math.random() * DT));
+    report.setFinishTime(System.currentTimeMillis()
+        + (int) (Math.random() * DT) + 1);
+    report.setPhase(PHASES.next());
+    report.setTaskAttemptState(TASK_ATTEMPT_STATES.next());
+    report.setProgress((float) Math.random());
+    report.setCounters(TypeConverter.toYarn(newCounters()));
+    return report;
+  }
+
+  public static Counters newCounters() {
+    Counters hc = new Counters();
+    for (JobCounter c : JobCounter.values()) {
+      hc.findCounter(c).setValue((long) (Math.random() * 1000));
+    }
+    for (TaskCounter c : TaskCounter.values()) {
+      hc.findCounter(c).setValue((long) (Math.random() * 1000));
+    }
+    int nc = FileSystemCounter.values().length * 4;
+    for (int i = 0; i < nc; ++i) {
+      for (FileSystemCounter c : FileSystemCounter.values()) {
+        hc.findCounter(FS_SCHEMES.next(), c).setValue(
+            (long) (Math.random() * DT));
+      }
+    }
+    for (int i = 0; i < 2 * 3; ++i) {
+      hc.findCounter(USER_COUNTER_GROUPS.next(), USER_COUNTERS.next())
+          .setValue((long) (Math.random() * 100000));
+    }
+    return hc;
+  }
+
+  public static Map<TaskAttemptId, TaskAttempt> newTaskAttempts(TaskId tid,
+      int m) {
+    Map<TaskAttemptId, TaskAttempt> map = Maps.newHashMap();
+    for (int i = 0; i < m; ++i) {
+      TaskAttempt ta = newTaskAttempt(tid, i);
+      map.put(ta.getID(), ta);
+    }
+    return map;
+  }
+
+  public static TaskAttempt newTaskAttempt(TaskId tid, int i) {
+    final TaskAttemptId taid = Records.newRecord(TaskAttemptId.class);
+    taid.setTaskId(tid);
+    taid.setId(i);
+    final TaskAttemptReport report = newTaskAttemptReport(taid);
+    final List<String> diags = Lists.newArrayList();
+    diags.add(DIAGS.next());
+    return new TaskAttempt() {
+      @Override
+      public NodeId getNodeId() throws UnsupportedOperationException{
+        throw new UnsupportedOperationException();
+      }
+      
+      @Override
+      public TaskAttemptId getID() {
+        return taid;
+      }
+
+      @Override
+      public TaskAttemptReport getReport() {
+        return report;
+      }
+
+      @Override
+      public long getLaunchTime() {
+        return 0;
+      }
+
+      @Override
+      public long getFinishTime() {
+        return 0;
+      }
+
+      @Override
+      public int getShufflePort() {
+        return ShuffleHandler.DEFAULT_SHUFFLE_PORT;
+      }
+
+      @Override
+      public Counters getCounters() {
+        if (report != null && report.getCounters() != null) {
+          return new Counters(TypeConverter.fromYarn(report.getCounters()));
+        }
+        return null;
+      }
+
+      @Override
+      public float getProgress() {
+        return report.getProgress();
+      }
+
+      @Override
+      public TaskAttemptState getState() {
+        return report.getTaskAttemptState();
+      }
+
+      @Override
+      public boolean isFinished() {
+        switch (report.getTaskAttemptState()) {
+        case SUCCEEDED:
+        case FAILED:
+        case KILLED:
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public ContainerId getAssignedContainerID() {
+        ContainerId id = Records.newRecord(ContainerId.class);
+        ApplicationAttemptId appAttemptId = Records
+            .newRecord(ApplicationAttemptId.class);
+        appAttemptId.setApplicationId(taid.getTaskId().getJobId().getAppId());
+        appAttemptId.setAttemptId(0);
+        id.setApplicationAttemptId(appAttemptId);
+        return id;
+      }
+
+      @Override
+      public String getNodeHttpAddress() {
+        return "localhost:8042";
+      }
+
+      @Override
+      public List<String> getDiagnostics() {
+        return diags;
+      }
+
+      @Override
+      public String getAssignedContainerMgrAddress() {
+        return "localhost:9998";
+      }
+
+      @Override
+      public long getShuffleFinishTime() {
+        return 0;
+      }
+
+      @Override
+      public long getSortFinishTime() {
+        return 0;
+      }
+
+      @Override
+      public String getNodeRackName() {
+        return "/default-rack";
+      }
+    };
+  }
+
+  public static Map<TaskId, Task> newTasks(JobId jid, int n, int m, boolean hasFailedTasks) {
+    Map<TaskId, Task> map = Maps.newHashMap();
+    for (int i = 0; i < n; ++i) {
+      Task task = newTask(jid, i, m, hasFailedTasks);
+      map.put(task.getID(), task);
+    }
+    return map;
+  }
+
+  public static Task newTask(JobId jid, int i, int m, final boolean hasFailedTasks) {
+    final TaskId tid = Records.newRecord(TaskId.class);
+    tid.setJobId(jid);
+    tid.setId(i);
+    tid.setTaskType(TASK_TYPES.next());
+    final TaskReport report = newTaskReport(tid);
+    final Map<TaskAttemptId, TaskAttempt> attempts = newTaskAttempts(tid, m);
+    return new Task() {
+      @Override
+      public TaskId getID() {
+        return tid;
+      }
+
+      @Override
+      public TaskReport getReport() {
+        return report;
+      }
+
+      @Override
+      public Counters getCounters() {
+        if (hasFailedTasks) {
+          return null;
+        }
+        return new Counters(
+          TypeConverter.fromYarn(report.getCounters()));
+      }
+
+      @Override
+      public float getProgress() {
+        return report.getProgress();
+      }
+
+      @Override
+      public TaskType getType() {
+        return tid.getTaskType();
+      }
+
+      @Override
+      public Map<TaskAttemptId, TaskAttempt> getAttempts() {
+        return attempts;
+      }
+
+      @Override
+      public TaskAttempt getAttempt(TaskAttemptId attemptID) {
+        return attempts.get(attemptID);
+      }
+
+      @Override
+      public boolean isFinished() {
+        switch (report.getTaskState()) {
+        case SUCCEEDED:
+        case KILLED:
+        case FAILED:
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public boolean canCommit(TaskAttemptId taskAttemptID) {
+        return false;
+      }
+
+      @Override
+      public TaskState getState() {
+        return report.getTaskState();
+      }
+    };
+  }
+
+  public static Counters getCounters(
+      Collection<Task> tasks) {
+    List<Task> completedTasks = new ArrayList<Task>();
+    for (Task task : tasks) {
+      if (task.getCounters() != null) {
+        completedTasks.add(task);
+      }
+    }
+    Counters counters = new Counters();
+    return JobImpl.incrTaskCounters(counters, completedTasks);
+  }
+
+  static class TaskCount {
+    int maps;
+    int reduces;
+    int completedMaps;
+    int completedReduces;
+
+    void incr(Task task) {
+      TaskType type = task.getType();
+      boolean finished = task.isFinished();
+      if (type == TaskType.MAP) {
+        if (finished) {
+          ++completedMaps;
+        }
+        ++maps;
+      } else if (type == TaskType.REDUCE) {
+        if (finished) {
+          ++completedReduces;
+        }
+        ++reduces;
+      }
+    }
+  }
+
+  static TaskCount getTaskCount(Collection<Task> tasks) {
+    TaskCount tc = new TaskCount();
+    for (Task task : tasks) {
+      tc.incr(task);
+    }
+    return tc;
+  }
+
+  public static Job newJob(ApplicationId appID, int i, int n, int m) {
+    return newJob(appID, i, n, m, null);
+  }
+
+  public static Job newJob(ApplicationId appID, int i, int n, int m, Path confFile) {
+    return newJob(appID, i, n, m, confFile, false);
+  }
+  
+  public static Job newJob(ApplicationId appID, int i, int n, int m,
+      Path confFile, boolean hasFailedTasks) {
+    final JobId id = newJobID(appID, i);
+    final String name = newJobName();
+    final JobReport report = newJobReport(id);
+    final Map<TaskId, Task> tasks = newTasks(id, n, m, hasFailedTasks);
+    final TaskCount taskCount = getTaskCount(tasks.values());
+    final Counters counters = getCounters(tasks
+      .values());
+    final Path configFile = confFile;
+
+    Map<JobACL, AccessControlList> tmpJobACLs = new HashMap<JobACL, AccessControlList>();
+    final Configuration conf = new Configuration();
+    conf.set(JobACL.VIEW_JOB.getAclName(), "testuser");
+    conf.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
+
+    JobACLsManager aclsManager = new JobACLsManager(conf);
+    tmpJobACLs = aclsManager.constructJobACLs(conf);
+    final Map<JobACL, AccessControlList> jobACLs = tmpJobACLs;
+    return new Job() {
+      @Override
+      public JobId getID() {
+        return id;
+      }
+
+      @Override
+      public String getName() {
+        return name;
+      }
+
+      @Override
+      public JobState getState() {
+        return report.getJobState();
+      }
+
+      @Override
+      public JobReport getReport() {
+        return report;
+      }
+
+      @Override
+      public float getProgress() {
+        return 0;
+      }
+
+      @Override
+      public Counters getAllCounters() {
+        return counters;
+      }
+
+      @Override
+      public Map<TaskId, Task> getTasks() {
+        return tasks;
+      }
+
+      @Override
+      public Task getTask(TaskId taskID) {
+        return tasks.get(taskID);
+      }
+
+      @Override
+      public int getTotalMaps() {
+        return taskCount.maps;
+      }
+
+      @Override
+      public int getTotalReduces() {
+        return taskCount.reduces;
+      }
+
+      @Override
+      public int getCompletedMaps() {
+        return taskCount.completedMaps;
+      }
+
+      @Override
+      public int getCompletedReduces() {
+        return taskCount.completedReduces;
+      }
+
+      @Override
+      public boolean isUber() {
+        return false;
+      }
+
+      @Override
+      public TaskAttemptCompletionEvent[] getTaskAttemptCompletionEvents(
+          int fromEventId, int maxEvents) {
+        return null;
+      }
+
+      @Override
+      public Map<TaskId, Task> getTasks(TaskType taskType) {
+        throw new UnsupportedOperationException("Not supported yet.");
+      }
+
+      @Override
+      public List<String> getDiagnostics() {
+        return Collections.<String> emptyList();
+      }
+
+      @Override
+      public boolean checkAccess(UserGroupInformation callerUGI,
+          JobACL jobOperation) {
+        return true;
+      }
+
+      @Override
+      public String getUserName() {
+        return "mock";
+      }
+
+      @Override
+      public String getQueueName() {
+        return "mockqueue";
+      }
+
+      @Override
+      public Path getConfFile() {
+        return configFile;
+      }
+
+      @Override
+      public Map<JobACL, AccessControlList> getJobACLs() {
+        return jobACLs;
+      }
+
+      @Override
+      public List<AMInfo> getAMInfos() {
+        List<AMInfo> amInfoList = new LinkedList<AMInfo>();
+        amInfoList.add(createAMInfo(1));
+        amInfoList.add(createAMInfo(2));
+        return amInfoList;
+      }
+
+      @Override
+      public Configuration loadConfFile() throws IOException {
+        FileContext fc = FileContext.getFileContext(configFile.toUri(), conf);
+        Configuration jobConf = new Configuration(false);
+        jobConf.addResource(fc.open(configFile), configFile.toString());
+        return jobConf;
+      }
+
+      @Override
+      public Configuration getConf() {
+        // TODO Auto-generated method stub
+        return null;
+      }
+    };
+  }
+
+  private static AMInfo createAMInfo(int attempt) {
+    ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId(
+        BuilderUtils.newApplicationId(100, 1), attempt);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+    return MRBuilderUtils.newAMInfo(appAttemptId, System.currentTimeMillis(),
+        containerId, NM_HOST, NM_PORT, NM_HTTP_PORT);
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFail.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,330 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Iterator;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptListenerImpl2;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.yarn.api.ContainerManager;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerToken;
+import org.junit.Test;
+
+/**
+ * Tests the state machine with respect to Job/Task/TaskAttempt failure 
+ * scenarios.
+ */
+@SuppressWarnings("unchecked")
+public class TestFail {
+
+  @Test
+  //First attempt is failed and second attempt is passed
+  //The job succeeds.
+  public void testFailTask() throws Exception {
+    MRApp app = new MockFirstFailingAttemptMRApp(1, 0);
+    Configuration conf = new Configuration();
+    // this test requires two task attempts, but uberization overrides max to 1
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+    Map<TaskId,Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED,
+        task.getReport().getTaskState());
+    Map<TaskAttemptId, TaskAttempt> attempts =
+        tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", 2, attempts.size());
+    //one attempt must be failed 
+    //and another must have succeeded
+    Iterator<TaskAttempt> it = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+        it.next().getReport().getTaskAttemptState());
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED,
+        it.next().getReport().getTaskAttemptState());
+  }
+
+  @Test
+  public void testMapFailureMaxPercent() throws Exception {
+    MRApp app = new MockFirstFailingTaskMRApp(4, 0);
+    Configuration conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 20);
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    
+    //setting the failure percentage to 25% (1/4 is 25) will
+    //make the Job successful
+    app = new MockFirstFailingTaskMRApp(4, 0);
+    conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 2);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 25);
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  public void testReduceFailureMaxPercent() throws Exception {
+    MRApp app = new MockFirstFailingTaskMRApp(2, 4);
+    Configuration conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 20);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    
+    //setting the failure percentage to 25% (1/4 is 25) will
+    //make the Job successful
+    app = new MockFirstFailingTaskMRApp(2, 4);
+    conf = new Configuration();
+    
+    //reduce the no of attempts so test run faster
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 2);
+    
+    conf.setInt(MRJobConfig.MAP_FAILURES_MAX_PERCENT, 50);//no failure due to Map
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+    conf.setInt(MRJobConfig.REDUCE_FAILURES_MAXPERCENT, 25);
+    conf.setInt(MRJobConfig.REDUCE_MAX_ATTEMPTS, 1);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  //All Task attempts are timed out, leading to Job failure
+  public void testTimedOutTask() throws Exception {
+    MRApp app = new TimeOutTaskMRApp(1, 0);
+    Configuration conf = new Configuration();
+    int maxAttempts = 2;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    // disable uberization (requires entire job to be reattempted, so max for
+    // subtask attempts is overridden to 1)
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.FAILED);
+    Map<TaskId,Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.FAILED,
+        task.getReport().getTaskState());
+    Map<TaskAttemptId, TaskAttempt> attempts =
+        tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", maxAttempts,
+        attempts.size());
+    for (TaskAttempt attempt : attempts.values()) {
+      Assert.assertEquals("Attempt state not correct", TaskAttemptState.FAILED,
+          attempt.getReport().getTaskAttemptState());
+    }
+  }
+
+  @Test
+  public void testTaskFailWithUnusedContainer() throws Exception {
+    MRApp app = new MRAppWithFailingTaskAndUnusedContainer();
+    Configuration conf = new Configuration();
+    int maxAttempts = 1;
+    conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
+    // disable uberization (requires entire job to be reattempted, so max for
+    // subtask attempts is overridden to 1)
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskId, Task> tasks = job.getTasks();
+    Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
+    Task task = tasks.values().iterator().next();
+    app.waitForState(task, TaskState.SCHEDULED);
+    Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
+        .next().getAttempts();
+    Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
+        .size());
+    TaskAttempt attempt = attempts.values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.START_WAIT);
+    // TODO XXX: This may not be a valid test.
+    app.getDispatcher().getEventHandler().handle(
+        new TaskAttemptEvent(attempt.getID(),
+            TaskAttemptEventType.TA_TERMINATED));
+    app.waitForState(job, JobState.FAILED);
+  }
+
+  static class MRAppWithFailingTaskAndUnusedContainer extends MRApp {
+
+    public MRAppWithFailingTaskAndUnusedContainer() {
+      super(1, 0, false, "TaskFailWithUnsedContainer", true);
+    }
+
+    @Override
+    protected ContainerLauncher createContainerLauncher(AppContext context) {
+      return new ContainerLauncherImpl(context) {
+        @Override
+        public void handle(NMCommunicatorEvent event) {
+
+          switch (event.getType()) {
+          case CONTAINER_LAUNCH_REQUEST:
+            super.handle(event); // Unused event and container.
+            break;
+          case CONTAINER_STOP_REQUEST:
+//            getContext().getEventHandler().handle(
+//                new TaskAttemptEvent(event.getTaskAttemptID(),
+//                    TaskAttemptEventType.TA_CONTAINER_CLEANED));
+            // TODO XXX: May need a CONTAINER_COMPLETED event to go out.
+            break;
+          }
+        }
+
+        @Override
+        protected ContainerManager getCMProxy(ContainerId contianerID,
+            String containerManagerBindAddr, ContainerToken containerToken)
+            throws IOException {
+          try {
+            synchronized (this) {
+              wait(); // Just hang the thread simulating a very slow NM.
+            }
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+          return null;
+        }
+      };
+    };
+  }
+
+  static class TimeOutTaskMRApp extends MRApp {
+    TimeOutTaskMRApp(int maps, int reduces) {
+      super(maps, reduces, false, "TimeOutTaskMRApp", true);
+    }
+    @Override
+    protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+        TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+      return new TaskAttemptListenerImpl2(getContext(), thh, chh, null) {
+        @Override
+        public void startRpcServer(){};
+        @Override
+        public void stopRpcServer(){};
+        @Override
+        public InetSocketAddress getAddress() {
+          return NetUtils.createSocketAddr("localhost", 1234);
+        }
+      };
+    }
+    
+    protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
+        Configuration conf) {
+      // Creates a TaskHeartbeatHandler with a low timeout value. THH will
+      // send out a lost event leading to attempt failure.
+      return new TaskHeartbeatHandler(getContext(), 1) {
+        @Override
+        public void init(Configuration conf) {
+          conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
+          conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
+          super.init(conf);
+        }
+      };
+    }
+  }
+
+  //Attempts of first Task are failed
+  static class MockFirstFailingTaskMRApp extends MRApp {
+
+    MockFirstFailingTaskMRApp(int maps, int reduces) {
+      super(maps, reduces, true, "MockFirstFailingTaskMRApp", true);
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      if (attemptID.getTaskId().getId() == 0) {//check if it is first task
+        // send the Fail event
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILED));
+        // TODO XXX: Was FAIL_MSG. Remove comment.
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  //First attempt is failed
+  static class MockFirstFailingAttemptMRApp extends MRApp {
+    MockFirstFailingAttemptMRApp(int maps, int reduces) {
+      super(maps, reduces, true, "MockFirstFailingAttemptMRApp", true);
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
+        //check if it is first task's first attempt
+        // send the Fail event
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID, TaskAttemptEventType.TA_FAILED));
+        // TODO XXX: Was FAIL_MSG. Remove comment.
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestFail t = new TestFail();
+    t.testFailTask();
+    t.testTimedOutTask();
+    t.testMapFailureMaxPercent();
+    t.testReduceFailureMaxPercent();
+    t.testTaskFailWithUnusedContainer();
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFetchFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFetchFailure.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFetchFailure.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestFetchFailure.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,273 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler2;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobTaskAttemptFetchFailureEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.junit.Test;
+
+public class TestFetchFailure {
+
+  @Test
+  public void testFetchFailure() throws Exception {
+    MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true);
+    Configuration conf = new Configuration();
+    // map -> reduce -> fetch-failure -> map retry is incompatible with
+    // sequential, single-task-attempt approach in uber-AM, so disable:
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("Num tasks not correct",
+       2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+    
+    //wait for Task state move to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt1.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+    
+    TaskAttemptCompletionEvent[] events = 
+      job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Num completion events not correct",
+        1, events.length);
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
+    
+    // wait for reduce to start running
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt reduceAttempt = 
+      reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+    
+    //send 3 fetch failures from reduce to trigger map re execution
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    
+    //wait for map Task state move back to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    
+    //map attempt must have become FAILED
+    Assert.assertEquals("Map TaskAttempt state not correct",
+        TaskAttemptState.FAILED, mapAttempt1.getState());
+
+    Assert.assertEquals("Num attempts in Map Task not correct",
+        2, mapTask.getAttempts().size());
+    
+    Iterator<TaskAttempt> atIt = mapTask.getAttempts().values().iterator();
+    atIt.next();
+    TaskAttempt mapAttempt2 = atIt.next();
+    
+    app.waitForState(mapAttempt2, TaskAttemptState.RUNNING);
+   //send the done signal to the second map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt2.getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+    
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt.getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    //previous completion event now becomes obsolete
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+    
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Num completion events not correct",
+        4, events.length);
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt1.getID(), events[0].getAttemptId());
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt1.getID(), events[1].getAttemptId());
+    Assert.assertEquals("Event map attempt id not correct",
+        mapAttempt2.getID(), events[2].getAttemptId());
+    Assert.assertEquals("Event redude attempt id not correct",
+        reduceAttempt.getID(), events[3].getAttemptId());
+    Assert.assertEquals("Event status not correct for map attempt1",
+        TaskAttemptCompletionEventStatus.OBSOLETE, events[0].getStatus());
+    Assert.assertEquals("Event status not correct for map attempt1",
+        TaskAttemptCompletionEventStatus.FAILED, events[1].getStatus());
+    Assert.assertEquals("Event status not correct for map attempt2",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[2].getStatus());
+    Assert.assertEquals("Event status not correct for reduce attempt1",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus());
+  }
+  
+  /**
+   * This tests that if a map attempt was failed (say due to fetch failures),
+   * then it gets re-run. When the next map attempt is running, if the AM dies,
+   * then, on AM re-run, the AM does not incorrectly remember the first failed
+   * attempt. Currently recovery does not recover running tasks. Effectively,
+   * the AM re-runs the maps from scratch.
+   */
+  @Test
+  public void testFetchFailureWithRecovery() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), true, ++runCount);
+    Configuration conf = new Configuration();
+    // map -> reduce -> fetch-failure -> map retry is incompatible with
+    // sequential, single-task-attempt approach in uber-AM, so disable:
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("Num tasks not correct",
+        2, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask = it.next();
+    Task reduceTask = it.next();
+
+    //wait for Task state move to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+    TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt1.getID(),
+          TaskAttemptEventType.TA_DONE));
+
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+    TaskAttemptCompletionEvent[] events = 
+      job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Num completion events not correct",
+        1, events.length);
+    Assert.assertEquals("Event status not correct",
+        TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus());
+
+    // wait for reduce to start running
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt reduceAttempt = 
+      reduceTask.getAttempts().values().iterator().next();
+    app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+    //send 3 fetch failures from reduce to trigger map re execution
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+    sendFetchFailure(app, reduceAttempt, mapAttempt1);
+
+    //wait for map Task state move back to RUNNING
+    app.waitForState(mapTask, TaskState.RUNNING);
+
+    // Crash the app again.
+    app.stop();
+
+    //rerun
+    app =
+      new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
+          ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("Num tasks not correct",
+        2, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask = it.next();
+    reduceTask = it.next();
+
+    // the map is not in a SUCCEEDED state after restart of AM
+    app.waitForState(mapTask, TaskState.RUNNING);
+    mapAttempt1 = mapTask.getAttempts().values().iterator().next();
+    app.waitForState(mapAttempt1, TaskAttemptState.RUNNING);
+
+    //send the done signal to the map attempt
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(mapAttempt1.getID(),
+          TaskAttemptEventType.TA_DONE));
+
+    // wait for map success
+    app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+    reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+    //send done to reduce
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(reduceAttempt.getID(),
+          TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Num completion events not correct", 2, events.length);
+  }
+
+  private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, 
+      TaskAttempt mapAttempt) {
+    app.getContext().getEventHandler().handle(
+        new JobTaskAttemptFetchFailureEvent(
+            reduceAttempt.getID(), 
+            Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()})));
+  }
+  
+  static class MRAppWithHistory extends MRApp {
+    public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart, int startCount) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      JobHistoryEventHandler2 eventHandler = new JobHistoryEventHandler2(context, 
+          getStartCount());
+      return eventHandler;
+    }
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestJobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestJobEndNotifier.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestJobEndNotifier.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestJobEndNotifier.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,146 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.net.Proxy;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests job end notification
+ *
+ */
+public class TestJobEndNotifier extends JobEndNotifier {
+
+  //Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS
+  private void testNumRetries(Configuration conf) {
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "0");
+    conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "10");
+    setConf(conf);
+    Assert.assertTrue("Expected numTries to be 0, but was " + numTries,
+      numTries == 0 );
+
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "1");
+    setConf(conf);
+    Assert.assertTrue("Expected numTries to be 1, but was " + numTries,
+      numTries == 1 );
+
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "20");
+    setConf(conf);
+    Assert.assertTrue("Expected numTries to be 11, but was " + numTries,
+      numTries == 11 ); //11 because number of _retries_ is 10
+  }
+
+  //Test maximum retry interval is capped by
+  //MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL
+  private void testWaitInterval(Configuration conf) {
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "5");
+    conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "1");
+    setConf(conf);
+    Assert.assertTrue("Expected waitInterval to be 1, but was " + waitInterval,
+      waitInterval == 1);
+
+    conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "10");
+    setConf(conf);
+    Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval,
+      waitInterval == 5);
+
+    //Test negative numbers are set to default
+    conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "-10");
+    setConf(conf);
+    Assert.assertTrue("Expected waitInterval to be 5, but was " + waitInterval,
+      waitInterval == 5);
+  }
+
+  private void testProxyConfiguration(Configuration conf) {
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost");
+    setConf(conf);
+    Assert.assertTrue("Proxy shouldn't be set because port wasn't specified",
+      proxyToUse.type() == Proxy.Type.DIRECT);
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:someport");
+    setConf(conf);
+    Assert.assertTrue("Proxy shouldn't be set because port wasn't numeric",
+      proxyToUse.type() == Proxy.Type.DIRECT);
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "somehost:1000");
+    setConf(conf);
+    Assert.assertTrue("Proxy should have been set but wasn't ",
+      proxyToUse.toString().equals("HTTP @ somehost:1000"));
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "socks@somehost:1000");
+    setConf(conf);
+    Assert.assertTrue("Proxy should have been socks but wasn't ",
+      proxyToUse.toString().equals("SOCKS @ somehost:1000"));
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "SOCKS@somehost:1000");
+    setConf(conf);
+    Assert.assertTrue("Proxy should have been socks but wasn't ",
+      proxyToUse.toString().equals("SOCKS @ somehost:1000"));
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_PROXY, "sfafn@somehost:1000");
+    setConf(conf);
+    Assert.assertTrue("Proxy should have been http but wasn't ",
+      proxyToUse.toString().equals("HTTP @ somehost:1000"));
+    
+  }
+
+  /**
+   * Test that setting parameters has the desired effect
+   */
+  @Test
+  public void checkConfiguration() {
+    Configuration conf = new Configuration();
+    testNumRetries(conf);
+    testWaitInterval(conf);
+    testProxyConfiguration(conf);
+  }
+
+  protected int notificationCount = 0;
+  @Override
+  protected boolean notifyURLOnce() {
+    boolean success = super.notifyURLOnce();
+    notificationCount++;
+    return success;
+  }
+
+  //Check retries happen as intended
+  @Test
+  public void testNotifyRetries() throws InterruptedException {
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS, "3");
+    conf.set(MRJobConfig.MR_JOB_END_RETRY_ATTEMPTS, "3");
+    conf.set(MRJobConfig.MR_JOB_END_RETRY_INTERVAL, "3000");
+    conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_MAX_RETRY_INTERVAL, "3000");
+    JobReport jobReport = Mockito.mock(JobReport.class);
+
+    long startTime = System.currentTimeMillis();
+    this.notificationCount = 0;
+    this.setConf(conf);
+    this.notify(jobReport);
+    long endTime = System.currentTimeMillis();
+    Assert.assertEquals("Only 3 retries were expected but was : "
+      + this.notificationCount, this.notificationCount, 3);
+    Assert.assertTrue("Should have taken more than 9 seconds it took "
+      + (endTime - startTime), endTime - startTime > 9000);
+
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestKill.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestKill.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestKill.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,233 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventKillRequest;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.Service;
+import org.junit.Test;
+
+/**
+ * Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
+ *
+ */
+public class TestKill {
+
+  @Test
+  public void testKillJob() throws Exception {
+    final CountDownLatch latch = new CountDownLatch(1);
+    
+    MRApp app = new BlockingMRApp(1, 0, latch);
+    //this will start the job but job won't complete as task is
+    //blocked
+    Job job = app.submit(new Configuration());
+    
+    //wait and vailidate for Job to become RUNNING
+    app.waitForState(job, JobState.RUNNING);
+    
+    //send the kill signal to Job
+    app.getContext().getEventHandler().handle(
+        new JobEvent(job.getID(), JobEventType.JOB_KILL));
+    
+    //unblock Task
+    latch.countDown();
+
+    //wait and validate for Job to be KILLED
+    app.waitForState(job, JobState.KILLED);
+    app.waitForState(Service.STATE.STOPPED);
+    Map<TaskId,Task> tasks = job.getTasks();
+    Assert.assertEquals("No of tasks is not correct", 1, 
+        tasks.size());
+    Task task = tasks.values().iterator().next();
+    Assert.assertEquals("Task state not correct", TaskState.KILLED, 
+        task.getReport().getTaskState());
+    Map<TaskAttemptId, TaskAttempt> attempts = 
+      tasks.values().iterator().next().getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 1, 
+        attempts.size());
+    Iterator<TaskAttempt> it = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED, 
+          it.next().getReport().getTaskAttemptState());
+
+    // XXX Will pass if using user facing states. Verify user facing as well as state machine states.
+    
+    // XXX 1. No PULL_REQUEST.
+    // XXX 2. No RELEASED.
+  }
+
+  @Test
+  public void testKillTask() throws Exception {
+    final CountDownLatch latch = new CountDownLatch(1);
+    MRApp app = new BlockingMRApp(2, 0, latch);
+    //this will start the job but job won't complete as Task is blocked
+    Job job = app.submit(new Configuration());
+    
+    //wait and vailidate for Job to become RUNNING
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskId,Task> tasks = job.getTasks();
+    Assert.assertEquals("No of tasks is not correct", 2, 
+        tasks.size());
+    Iterator<Task> it = tasks.values().iterator();
+    Task task1 = it.next();
+    Task task2 = it.next();
+    
+    //send the kill signal to the first Task
+    app.getContext().getEventHandler().handle(
+          new TaskEvent(task1.getID(), TaskEventType.T_KILL));
+    
+    //unblock Task
+    latch.countDown();
+    
+    //wait and validate for Job to become SUCCEEDED
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    //first Task is killed and second is Succeeded
+    //Job is succeeded
+    
+    Assert.assertEquals("Task state not correct", TaskState.KILLED, 
+        task1.getReport().getTaskState());
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, 
+        task2.getReport().getTaskState());
+    Map<TaskAttemptId, TaskAttempt> attempts = task1.getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 1, 
+        attempts.size());
+    Iterator<TaskAttempt> iter = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED, 
+          iter.next().getReport().getTaskAttemptState());
+
+    attempts = task2.getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 1, 
+        attempts.size());
+    iter = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, 
+          iter.next().getReport().getTaskAttemptState());
+  }
+
+  @Test
+  public void testKillTaskAttempt() throws Exception {
+    final CountDownLatch latch = new CountDownLatch(1);
+    MRApp app = new BlockingMRApp(2, 0, latch);
+    //this will start the job but job won't complete as Task is blocked
+    Job job = app.submit(new Configuration());
+    
+    //wait and vailidate for Job to become RUNNING
+    app.waitForState(job, JobState.RUNNING);
+    Map<TaskId,Task> tasks = job.getTasks();
+    Assert.assertEquals("No of tasks is not correct", 2, 
+        tasks.size());
+    Iterator<Task> it = tasks.values().iterator();
+    Task task1 = it.next();
+    Task task2 = it.next();
+    
+    //wait for tasks to become running
+    app.waitForState(task1, TaskState.SCHEDULED);
+    app.waitForState(task2, TaskState.SCHEDULED);
+    
+    //send the kill signal to the first Task's attempt
+    TaskAttempt attempt = task1.getAttempts().values().iterator().next();
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEventKillRequest(attempt.getID(),
+                "uni test kill request"));
+    
+    //unblock
+    latch.countDown();
+    
+    //wait and validate for Job to become SUCCEEDED
+    //job will still succeed
+    app.waitForState(job, JobState.SUCCEEDED);
+    
+    //first Task will have two attempts 1st is killed, 2nd Succeeds
+    //both Tasks and Job succeeds
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, 
+        task1.getReport().getTaskState());
+    Assert.assertEquals("Task state not correct", TaskState.SUCCEEDED, 
+        task2.getReport().getTaskState());
+ 
+    Map<TaskAttemptId, TaskAttempt> attempts = task1.getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 2, 
+        attempts.size());
+    Iterator<TaskAttempt> iter = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.KILLED, 
+          iter.next().getReport().getTaskAttemptState());
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, 
+        iter.next().getReport().getTaskAttemptState());
+    
+    attempts = task2.getAttempts();
+    Assert.assertEquals("No of attempts is not correct", 1, 
+        attempts.size());
+    iter = attempts.values().iterator();
+    Assert.assertEquals("Attempt state not correct", TaskAttemptState.SUCCEEDED, 
+          iter.next().getReport().getTaskAttemptState());
+  }
+
+  static class BlockingMRApp extends MRApp {
+    private CountDownLatch latch;
+    BlockingMRApp(int maps, int reduces, CountDownLatch latch) {
+      super(maps, reduces, true, "testKill", true);
+      this.latch = latch;
+    }
+
+    @Override
+    protected void attemptLaunched(TaskAttemptId attemptID) {
+      if (attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0) {
+        //this blocks the first task's first attempt
+        //the subsequent ones are completed
+        try {
+          latch.await();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      } else {
+        getContext().getEventHandler().handle(
+            new TaskAttemptEvent(attemptID,
+                TaskAttemptEventType.TA_DONE));
+      }
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestKill t = new TestKill();
+    t.testKillJob();
+    t.testKillTask();
+    t.testKillTaskAttempt();
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRApp.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRApp.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRApp.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,419 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler2;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobUpdatedNodesEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.junit.Test;
+
+/**
+ * Tests the state machine of MR App.
+ */
+@SuppressWarnings("unchecked")
+public class TestMRApp {
+
+  @Test
+  public void testMapReduce() throws Exception {
+    MRApp app = new MRApp(2, 2, true, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    Assert.assertEquals(System.getProperty("user.name"),job.getUserName());
+  }
+
+  @Test
+  public void testZeroMaps() throws Exception {
+    MRApp app = new MRApp(0, 1, true, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+  }
+  
+  @Test
+  public void testZeroMapReduces() throws Exception{
+    MRApp app = new MRApp(0, 0, true, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+  
+  @Test
+  public void testCommitPending() throws Exception {
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+    TaskAttempt attempt = task.getAttempts().values().iterator().next();
+    app.waitForState(attempt, TaskAttemptState.RUNNING);
+
+    //send the commit pending signal to the task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            attempt.getID(),
+            TaskAttemptEventType.TA_COMMIT_PENDING));
+
+    //wait for first attempt to commit pending
+    app.waitForState(attempt, TaskAttemptState.COMMIT_PENDING);
+
+    //send the done signal to the task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            task.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  //@Test
+  public void testCompletedMapsForReduceSlowstart() throws Exception {
+    MRApp app = new MRApp(2, 1, false, this.getClass().getName(), true);
+    Configuration conf = new Configuration();
+    //after half of the map completion, reduce will start
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
+    //uberization forces full slowstart (1.0), so disable that
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    //all maps would be running
+    Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+    Task reduceTask = it.next();
+    
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+    
+    TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator().next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    
+    //before sending the TA_DONE, event make sure attempt has come to 
+    //RUNNING state
+    app.waitForState(task1Attempt, TaskAttemptState.RUNNING);
+    app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
+    
+    // reduces must be in NEW state
+    Assert.assertEquals("Reduce Task state not correct",
+        TaskState.NEW, reduceTask.getReport().getTaskState());
+    
+    //send the done signal to the 1st map task
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask1.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    //wait for first map task to complete
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+
+    //Once the first map completes, it will schedule the reduces
+    //now reduce must be running
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    
+    //send the done signal to 2nd map and the reduce to complete the job
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            mapTask2.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    app.getContext().getEventHandler().handle(
+        new TaskAttemptEvent(
+            reduceTask.getAttempts().values().iterator().next().getID(),
+            TaskAttemptEventType.TA_DONE));
+    
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+  
+  /**
+   * The test verifies that the AM re-runs maps that have run on bad nodes. It
+   * also verifies that the AM records all success/killed events so that reduces
+   * are notified about map output status changes. It also verifies that the
+   * re-run information is preserved across AM restart
+   */
+  @Test
+  public void testUpdatedNodes() throws Exception {
+    int runCount = 0;
+    MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
+        true, ++runCount);
+    Configuration conf = new Configuration();
+    // after half of the map completion, reduce will start
+    conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f);
+    // uberization forces full slowstart (1.0), so disable that
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 3, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task mapTask1 = it.next();
+    Task mapTask2 = it.next();
+
+    // all maps must be running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator()
+        .next();
+    TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator()
+        .next();
+    NodeId node1 = task1Attempt.getNodeId();
+    NodeId node2 = task2Attempt.getNodeId();
+    Assert.assertEquals(node1, node2);
+
+    // send the done signal to the task
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task1Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task2Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+
+    // all maps must be succeeded
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    TaskAttemptCompletionEvent[] events = job.getTaskAttemptCompletionEvents(0,
+        100);
+    Assert.assertEquals("Expecting 2 completion events for success", 2,
+        events.length);
+
+    // send updated nodes info
+    ArrayList<NodeReport> updatedNodes = new ArrayList<NodeReport>();
+    NodeReport nr = RecordFactoryProvider.getRecordFactory(null)
+        .newRecordInstance(NodeReport.class);
+    nr.setNodeId(node1);
+    nr.setNodeState(NodeState.UNHEALTHY);
+    updatedNodes.add(nr);
+    app.getContext().getEventHandler()
+        .handle(new JobUpdatedNodesEvent(job.getID(), updatedNodes));
+
+    app.waitForState(task1Attempt, TaskAttemptState.KILLED);
+    app.waitForState(task2Attempt, TaskAttemptState.KILLED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 2 more completion events for killed", 4,
+        events.length);
+
+    // all maps must be back to running
+    app.waitForState(mapTask1, TaskState.RUNNING);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    Iterator<TaskAttempt> itr = mapTask1.getAttempts().values().iterator();
+    itr.next();
+    task1Attempt = itr.next();
+
+    // send the done signal to the task
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task1Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+
+    // map1 must be succeeded. map2 must be running
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 5,
+        events.length);
+
+    // Crash the app again.
+    app.stop();
+
+    // rerun
+    // in rerun the 1st map will be recovered from previous run
+    app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
+        ++runCount);
+    conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
+    conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
+    job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size());
+    it = job.getTasks().values().iterator();
+    mapTask1 = it.next();
+    mapTask2 = it.next();
+    Task reduceTask = it.next();
+
+    // map 1 will be recovered, no need to send done
+    app.waitForState(mapTask1, TaskState.SUCCEEDED);
+    app.waitForState(mapTask2, TaskState.RUNNING);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals(
+        "Expecting 2 completion events for killed & success of map1", 2,
+        events.length);
+
+    task2Attempt = mapTask2.getAttempts().values().iterator().next();
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task2Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.waitForState(mapTask2, TaskState.SUCCEEDED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 3,
+        events.length);
+
+    app.waitForState(reduceTask, TaskState.RUNNING);
+    TaskAttempt task3Attempt = reduceTask.getAttempts().values().iterator()
+        .next();
+    app.getContext()
+        .getEventHandler()
+        .handle(
+            new TaskAttemptEvent(task3Attempt.getID(),
+                TaskAttemptEventType.TA_DONE));
+    app.waitForState(reduceTask, TaskState.SUCCEEDED);
+
+    events = job.getTaskAttemptCompletionEvents(0, 100);
+    Assert.assertEquals("Expecting 1 more completion events for success", 4,
+        events.length);
+
+    // job succeeds
+    app.waitForState(job, JobState.SUCCEEDED);
+  }
+
+  @Test
+  public void testJobError() throws Exception {
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+
+    //send an invalid event on task at current state
+    app.getContext().getEventHandler().handle(
+        new TaskEvent(
+            task.getID(), TaskEventType.T_SCHEDULE));
+
+    //this must lead to job error
+    app.waitForState(job, JobState.ERROR);
+  }
+
+  private final class MRAppWithSpiedJob extends MRApp {
+    private JobImpl spiedJob;
+
+    private MRAppWithSpiedJob(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart);
+    }
+
+    @Override
+    protected Job createJob(Configuration conf) {
+      spiedJob = spy((JobImpl) super.createJob(conf));
+      ((AppContext) getContext()).getAllJobs().put(spiedJob.getID(), spiedJob);
+      return spiedJob;
+    }
+  }
+
+  @Test
+  public void testCountersOnJobFinish() throws Exception {
+    MRAppWithSpiedJob app =
+        new MRAppWithSpiedJob(1, 1, true, this.getClass().getName(), true);
+    JobImpl job = (JobImpl)app.submit(new Configuration());
+    app.waitForState(job, JobState.SUCCEEDED);
+    app.verifyCompleted();
+    System.out.println(job.getAllCounters());
+    // Just call getCounters
+    job.getAllCounters();
+    job.getAllCounters();
+    // Should be called only once
+    verify(job, times(1)).constructFinalFullcounters();
+  }
+
+  @Test
+  public void checkJobStateTypeConversion() {
+    //verify that all states can be converted without 
+    // throwing an exception
+    for (JobState state : JobState.values()) {
+      TypeConverter.fromYarn(state);
+    }
+  }
+
+  @Test
+  public void checkTaskStateTypeConversion() {
+    //verify that all states can be converted without 
+    // throwing an exception
+    for (TaskState state : TaskState.values()) {
+      TypeConverter.fromYarn(state);
+    }
+  }
+  
+  private final class MRAppWithHistory extends MRApp {
+    public MRAppWithHistory(int maps, int reduces, boolean autoComplete,
+        String testName, boolean cleanOnStart, int startCount) {
+      super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
+    }
+
+    @Override
+    protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+        AppContext context) {
+      JobHistoryEventHandler2 eventHandler = new JobHistoryEventHandler2(context, 
+          getStartCount());
+      return eventHandler;
+    }
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestMRApp t = new TestMRApp();
+    t.testMapReduce();
+    t.testZeroMapReduces();
+    t.testCommitPending();
+    t.testCompletedMapsForReduceSlowstart();
+    t.testJobError();
+    t.testCountersOnJobFinish();
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRAppMaster.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRAppMaster.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/TestMRAppMaster.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.junit.Test;
+
+public class TestMRAppMaster {
+  @Test
+  public void testMRAppMasterForDifferentUser() throws IOException,
+      InterruptedException {
+    String applicationAttemptIdStr = "appattempt_1317529182569_0004_000001";
+    String containerIdStr = "container_1317529182569_0004_000001_1";
+    String stagingDir = "/tmp/staging";
+    String userName = "TestAppMasterUser";
+    ApplicationAttemptId applicationAttemptId = ConverterUtils
+        .toApplicationAttemptId(applicationAttemptIdStr);
+    ContainerId containerId = ConverterUtils.toContainerId(containerIdStr);
+    MRAppMasterTest appMaster =
+        new MRAppMasterTest(applicationAttemptId, containerId, "host", -1, -1,
+            System.currentTimeMillis());
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    MRAppMaster.initAndStartAppMaster(appMaster, conf, userName);
+    Assert.assertEquals(stagingDir + Path.SEPARATOR + userName + Path.SEPARATOR
+        + ".staging", appMaster.stagingDirPath.toString());
+  }
+}
+
+class MRAppMasterTest extends MRAppMaster {
+
+  Path stagingDirPath;
+  private Configuration conf;
+
+  public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
+      ContainerId containerId, String host, int port, int httpPort,
+      long submitTime) {
+    super(applicationAttemptId, containerId, host, port, httpPort, submitTime);
+  }
+
+  @Override
+  public void init(Configuration conf) {
+    this.conf = conf;
+  }
+
+  @Override
+  public void start() {
+    try {
+      String user = UserGroupInformation.getCurrentUser().getShortUserName();
+      stagingDirPath = MRApps.getStagingAreaDir(conf, user);
+    } catch (Exception e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+}



Mime
View raw message