hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ss...@apache.org
Subject svn commit: r1376283 [15/22] - in /hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-app2/ hadoop-mapreduce-client-app2/src/ hadoop-mapreduce-client-app2/src/main/ hadoop-mapreduce-client-app2/s...
Date Wed, 22 Aug 2012 22:11:48 GMT
Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterGroupInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterGroupInfo.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterGroupInfo.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterGroupInfo.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TaskCounterGroupInfo {
+
+  protected String counterGroupName;
+  protected ArrayList<TaskCounterInfo> counter;
+
+  public TaskCounterGroupInfo() {
+  }
+
+  public TaskCounterGroupInfo(String name, CounterGroup group) {
+    this.counterGroupName = name;
+    this.counter = new ArrayList<TaskCounterInfo>();
+
+    for (Counter c : group) {
+      TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
+      this.counter.add(cinfo);
+    }
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterInfo.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterInfo.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterInfo.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "counter")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TaskCounterInfo {
+
+  protected String name;
+  protected long value;
+
+  public TaskCounterInfo() {
+  }
+
+  public TaskCounterInfo(String name, long value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public long getValue() {
+    return value;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskInfo.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskInfo.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskInfo.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.yarn.util.Times;
+
+@XmlRootElement(name = "task")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TaskInfo {
+
+  protected long startTime;
+  protected long finishTime;
+  protected long elapsedTime;
+  protected float progress;
+  protected String id;
+  protected TaskState state;
+  protected String type;
+  protected String successfulAttempt;
+
+  @XmlTransient
+  int taskNum;
+
+  @XmlTransient
+  TaskAttempt successful;
+
+  public TaskInfo() {
+  }
+
+  public TaskInfo(Task task) {
+    TaskType ttype = task.getType();
+    this.type = ttype.toString();
+    TaskReport report = task.getReport();
+    this.startTime = report.getStartTime();
+    this.finishTime = report.getFinishTime();
+    this.elapsedTime = Times.elapsed(this.startTime, this.finishTime, false);
+    if (this.elapsedTime == -1) {
+      this.elapsedTime = 0;
+    }
+    this.state = report.getTaskState();
+    this.progress = report.getProgress() * 100;
+    this.id = MRApps.toString(task.getID());
+    this.taskNum = task.getID().getId();
+    this.successful = getSuccessfulAttempt(task);
+    if (successful != null) {
+      this.successfulAttempt = MRApps.toString(successful.getID());
+    } else {
+      this.successfulAttempt = "";
+    }
+  }
+
+  public float getProgress() {
+    return this.progress;
+  }
+
+  public String getState() {
+    return this.state.toString();
+  }
+
+  public String getId() {
+    return this.id;
+  }
+
+  public int getTaskNum() {
+    return this.taskNum;
+  }
+
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public long getElapsedTime() {
+    return this.elapsedTime;
+  }
+
+  public String getSuccessfulAttempt() {
+    return this.successfulAttempt;
+  }
+
+  public TaskAttempt getSuccessful() {
+    return this.successful;
+  }
+
+  private TaskAttempt getSuccessfulAttempt(Task task) {
+    for (TaskAttempt attempt : task.getAttempts().values()) {
+      if (attempt.getState() == TaskAttemptState.SUCCEEDED) {
+        return attempt;
+      }
+    }
+    return null;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TasksInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TasksInfo.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TasksInfo.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TasksInfo.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by tasklicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "tasks")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TasksInfo {
+
+  protected ArrayList<TaskInfo> task = new ArrayList<TaskInfo>();
+
+  public TasksInfo() {
+  } // JAXB needs this
+
+  public void add(TaskInfo taskInfo) {
+    task.add(taskInfo);
+  }
+
+  public ArrayList<TaskInfo> getTasks() {
+    return task;
+  }
+
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/package-info.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/package-info.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/package-info.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo Wed Aug 22 22:11:39 2012
@@ -0,0 +1 @@
+org.apache.hadoop.mapreduce.v2.app.MRClientSecurityInfo

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,149 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerImpl;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestTaskAttemptListenerImpl {
+  public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImpl2 {
+
+    public MockTaskAttemptListenerImpl(AppContext context,
+        JobTokenSecretManager jobTokenSecretManager,
+        TaskHeartbeatHandler hbHandler) {
+      super(context, hbHandler, null, jobTokenSecretManager);
+    }
+
+    @Override
+    protected void startRpcServer() {
+      //Empty
+    }
+    
+    @Override
+    protected void stopRpcServer() {
+      //Empty
+    }
+  }
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testGetTask() throws IOException {
+    AppContext appCtx = mock(AppContext.class);
+    EventHandler mockHandler = mock(EventHandler.class);
+    AMContainerMap amContainers = mock(AMContainerMap.class);
+    
+    // Request to get a task for Container1 returns null.
+    ContainerId containerId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+    AMContainerImpl amContainer1 = mock(AMContainerImpl.class);
+    when(amContainer1.pullTaskAttempt()).thenReturn(null);
+    when(amContainers.get(containerId1)).thenReturn(amContainer1);
+    
+    Task task = mock(Task.class);
+    TaskAttemptID taID = new TaskAttemptID("1", 1, TaskType.MAP, 1, 1);
+    when(task.getTaskID()).thenReturn(taID);
+    
+    
+    // Request to get a task for Container2 returns task.
+    ContainerId containerId2 = BuilderUtils.newContainerId(1, 1,1,2);
+    AMContainerImpl amContainer2 = mock(AMContainerImpl.class);
+    when(amContainer2.pullTaskAttempt()).thenReturn(task);
+    when(amContainers.get(containerId2)).thenReturn(amContainer2);
+    
+    when(appCtx.getAllContainers()).thenReturn(amContainers);
+    when(appCtx.getContainer(containerId1)).thenReturn(amContainer1);
+    when(appCtx.getContainer(containerId2)).thenReturn(amContainer2);
+    when(appCtx.getEventHandler()).thenReturn(mockHandler);
+    
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
+    TaskHeartbeatHandler hbHandler = mock(TaskHeartbeatHandler.class);
+    
+    MockTaskAttemptListenerImpl listener = 
+      new MockTaskAttemptListenerImpl(appCtx, secret, hbHandler);
+    Configuration conf = new Configuration();
+    listener.init(conf);
+    listener.start();
+    
+    JVMId id1 = new JVMId("foo",1, true, 1);
+    WrappedJvmID wid1 = new WrappedJvmID(id1.getJobId(), id1.isMap, id1.getId());
+    JvmContext context1 = new JvmContext();
+    context1.jvmId = id1;
+    
+    JVMId id2 = new JVMId("foo",1, true, 2);
+    WrappedJvmID wid2 = new WrappedJvmID(id2.getJobId(), id2.isMap, id2.getId());
+    JvmContext context2 = new JvmContext();
+    context2.jvmId = id2;
+    
+    JvmTask result = null;
+    
+    // Verify ask before registration.
+    //The JVM ID has not been registered yet so we should kill it.
+    result = listener.getTask(context1);
+    assertNotNull(result);
+    assertTrue(result.shouldDie);
+
+    // Verify ask after JVM registration, but before container is assigned a task.
+    listener.registerRunningJvm(wid1, containerId1);
+    result = listener.getTask(context1);
+    assertNull(result);
+    
+    // Verify ask after JVM registration, and when the container has a task.
+    listener.registerRunningJvm(wid2, containerId2);
+    result = listener.getTask(context2);
+    assertNotNull(result);
+    assertFalse(result.shouldDie);
+    assertTrue(result.getTask() == task);
+    ArgumentCaptor<Event> ac = ArgumentCaptor.forClass(Event.class);
+    verify(mockHandler).handle(ac.capture());
+    Event cEvent = ac.getValue();
+    assertTrue(cEvent.getClass().equals(TaskAttemptRemoteStartEvent.class));
+    TaskAttemptRemoteStartEvent event = (TaskAttemptRemoteStartEvent)cEvent; 
+    assertTrue(event.getType() == TaskAttemptEventType.TA_STARTED_REMOTELY);
+    assertTrue(event.getContainerId().equals(containerId2));
+    
+    // Verify ask after JVM is unregistered.
+    listener.unregisterRunningJvm(wid1);
+    result = listener.getTask(context1);
+    assertNotNull(result);
+    assertTrue(result.shouldDie());
+
+    listener.stop();
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,400 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.verification.VerificationMode;
+
+public class TestJobHistoryEventHandler {
+
+
+  private static final Log LOG = LogFactory
+      .getLog(TestJobHistoryEventHandler.class);
+
+  @Test
+  public void testFirstFlushOnCompletionEvent() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
+    conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+        60 * 1000l);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
+    conf.setInt(
+        MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 200);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter).write(any(HistoryEvent.class));
+
+      for (int i = 0; i < 100; i++) {
+        queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(
+            t.taskID, 0, TaskType.MAP, "")));
+      }
+      handleNextNEvents(jheh, 100);
+      verify(mockWriter, times(0)).flush();
+
+      // First completion event, but min-queue-size for batching flushes is 10
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
+          t.taskID, null, 0, TaskType.MAP, "", null)));
+      verify(mockWriter).flush();
+
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+
+  @Test
+  public void testMaxUnflushedCompletionEvents() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
+    conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+        60 * 1000l);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
+    conf.setInt(
+        MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter).write(any(HistoryEvent.class));
+
+      for (int i = 0 ; i < 100 ; i++) {
+        queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
+            t.taskID, null, 0, TaskType.MAP, "", null)));
+      }
+
+      handleNextNEvents(jheh, 9);
+      verify(mockWriter, times(0)).flush();
+
+      handleNextNEvents(jheh, 1);
+      verify(mockWriter).flush();
+      
+      handleNextNEvents(jheh, 50);
+      verify(mockWriter, times(6)).flush();
+      
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+  
+  @Test
+  public void testUnflushedTimer() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
+    conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+        2 * 1000l); //2 seconds.
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 100);
+    conf.setInt(
+        MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter).write(any(HistoryEvent.class));
+
+      for (int i = 0 ; i < 100 ; i++) {
+        queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
+            t.taskID, null, 0, TaskType.MAP, "", null)));
+      }
+
+      handleNextNEvents(jheh, 9);
+      verify(mockWriter, times(0)).flush();
+
+      Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
+      verify(mockWriter).flush();
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+  
+  @Test
+  public void testBatchedFlushJobEndMultiplier() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
+    conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+        60 * 1000l); //2 seconds.
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 3);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
+    conf.setInt(
+        MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 0);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter).write(any(HistoryEvent.class));
+
+      for (int i = 0 ; i < 100 ; i++) {
+        queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
+            t.taskID, null, 0, TaskType.MAP, "", null)));
+      }
+      queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+          TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
+
+      handleNextNEvents(jheh, 29);
+      verify(mockWriter, times(0)).flush();
+
+      handleNextNEvents(jheh, 72);
+      verify(mockWriter, times(4)).flush(); //3 * 30 + 1 for JobFinished
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+
+  private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
+    jheh.handle(event);
+  }
+
+  private void handleEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event)
+      throws InterruptedException {
+    jheh.handle(event);
+    jheh.handleEvent(jheh.eventQueue.take());
+  }
+
+  private void handleNextNEvents(JHEvenHandlerForTest jheh, int numEvents)
+      throws InterruptedException {
+    for (int i = 0; i < numEvents; i++) {
+      jheh.handleEvent(jheh.eventQueue.take());
+    }
+  }
+
+  private String setupTestWorkDir() {
+    File testWorkDir = new File("target", this.getClass().getCanonicalName());
+    try {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testWorkDir.getAbsolutePath()), true);
+      return testWorkDir.getAbsolutePath();
+    } catch (Exception e) {
+      LOG.warn("Could not cleanup", e);
+      throw new YarnException("could not cleanup test dir", e);
+    }
+  }
+
+  private AppContext mockAppContext(JobId jobId) {
+    AppContext mockContext = mock(AppContext.class);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTotalMaps()).thenReturn(10);
+    when(mockJob.getTotalReduces()).thenReturn(10);
+    when(mockJob.getName()).thenReturn("mockjob");
+    when(mockContext.getJob(jobId)).thenReturn(mockJob);
+    return mockContext;
+  }
+  
+
+  private class TestParams {
+    String workDir = setupTestWorkDir();
+    ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+    TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    AppContext mockAppContext = mockAppContext(jobId);
+  }
+
+  private JobHistoryEvent getEventToEnqueue(JobId jobId) {
+    JobHistoryEvent toReturn = Mockito.mock(JobHistoryEvent.class);
+    HistoryEvent he = Mockito.mock(HistoryEvent.class);
+    Mockito.when(he.getEventType()).thenReturn(EventType.JOB_STATUS_CHANGED);
+    Mockito.when(toReturn.getHistoryEvent()).thenReturn(he);
+    Mockito.when(toReturn.getJobID()).thenReturn(jobId);
+    return toReturn;
+  }
+
+  @Test
+  /**
+   * Tests that in case of SIGTERM, the JHEH stops without processing its event
+   * queue (because we must stop quickly lest we get SIGKILLed) and processes
+   * a JobUnsuccessfulEvent for jobs which were still running (so that they may
+   * show up in the JobHistoryServer)
+   */
+  public void testSigTermedFunctionality() throws IOException {
+    AppContext mockedContext = Mockito.mock(AppContext.class);
+    JHEventHandlerForSigtermTest jheh =
+      new JHEventHandlerForSigtermTest(mockedContext, 0);
+
+    JobId jobId = Mockito.mock(JobId.class);
+    jheh.addToFileMap(jobId);
+
+    //Submit 4 events and check that they're handled in the absence of a signal
+    final int numEvents = 4;
+    JobHistoryEvent events[] = new JobHistoryEvent[numEvents];
+    for(int i=0; i < numEvents; ++i) {
+      events[i] = getEventToEnqueue(jobId);
+      jheh.handle(events[i]);
+    }
+    jheh.stop();
+    //Make sure events were handled
+    assertTrue("handleEvent should've been called only 4 times but was "
+      + jheh.eventsHandled, jheh.eventsHandled == 4);
+
+    //Create a new jheh because the last stop closed the eventWriter etc.
+    jheh = new JHEventHandlerForSigtermTest(mockedContext, 0);
+
+    // Make constructor of JobUnsuccessfulCompletionEvent pass
+    Job job = Mockito.mock(Job.class);
+    Mockito.when(mockedContext.getJob(jobId)).thenReturn(job);
+    // Make TypeConverter(JobID) pass
+    ApplicationId mockAppId = Mockito.mock(ApplicationId.class);
+    Mockito.when(mockAppId.getClusterTimestamp()).thenReturn(1000l);
+    Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
+
+    jheh.addToFileMap(jobId);
+    jheh.setSignalled(true);
+    for(int i=0; i < numEvents; ++i) {
+      events[i] = getEventToEnqueue(jobId);
+      jheh.handle(events[i]);
+    }
+    jheh.stop();
+    //Make sure events were handled, 4 + 1 finish event
+    assertTrue("handleEvent should've been called only 5 times but was "
+        + jheh.eventsHandled, jheh.eventsHandled == 5);
+    assertTrue("Last event handled wasn't JobUnsuccessfulCompletionEvent",
+        jheh.lastEventHandled.getHistoryEvent()
+        instanceof JobUnsuccessfulCompletionEvent);
+  }
+}
+
+class JHEvenHandlerForTest extends JobHistoryEventHandler2 {
+
+  private EventWriter eventWriter;
+  volatile int handleEventCompleteCalls = 0;
+  volatile int handleEventStartedCalls = 0;
+
+  public JHEvenHandlerForTest(AppContext context, int startCount) {
+    super(context, startCount);
+  }
+
+  @Override
+  public void start() {
+  }
+  
+  @Override
+  protected EventWriter createEventWriter(Path historyFilePath)
+      throws IOException {
+    this.eventWriter = mock(EventWriter.class);
+    return this.eventWriter;
+  }
+
+  @Override
+  protected void closeEventWriter(JobId jobId) {
+  }
+  
+  public EventWriter getEventWriter() {
+    return this.eventWriter;
+  }
+}
+
+/**
+ * Class to help with testSigTermedFunctionality
+ */
+class JHEventHandlerForSigtermTest extends JobHistoryEventHandler2 {
+  private MetaInfo metaInfo;
+  public JHEventHandlerForSigtermTest(AppContext context, int startCount) {
+    super(context, startCount);
+  }
+
+  public void addToFileMap(JobId jobId) {
+    metaInfo = Mockito.mock(MetaInfo.class);
+    Mockito.when(metaInfo.isWriterActive()).thenReturn(true);
+    fileMap.put(jobId, metaInfo);
+  }
+
+  JobHistoryEvent lastEventHandled;
+  int eventsHandled = 0;
+  @Override
+  protected void handleEvent(JobHistoryEvent event) {
+    this.lastEventHandled = event;
+    this.eventsHandled++;
+  }
+}

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRApp.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,739 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.app2;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.WrappedJvmID;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.JobFinishEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptRemoteStartEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app2.launcher.ContainerLauncher;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTALaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.AMSchedulerTAStopRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+import org.apache.hadoop.mapreduce.v2.app2.rm.NMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorContainerDeAllocateRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMCommunicatorEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerRequestor;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainer;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerAssignTAEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventLaunched;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventReleased;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerLaunchRequestEvent;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerState;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleaner;
+import org.apache.hadoop.mapreduce.v2.app2.taskclean.TaskCleanupEvent;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.Clock;
+import org.apache.hadoop.yarn.ClusterInfo;
+import org.apache.hadoop.yarn.SystemClock;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.service.Service;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.hadoop.yarn.util.Records;
+
+
+/**
+ * Mock MRAppMaster. Doesn't start RPC servers.
+ * No threads are started except of the event Dispatcher thread.
+ */
+@SuppressWarnings("unchecked")
+public class MRApp extends MRAppMaster {
+  private static final Log LOG = LogFactory.getLog(MRApp.class);
+
+  int maps;
+  int reduces;
+
+  private File testWorkDir;
+  private Path testAbsPath;
+  private ClusterInfo clusterInfo;
+
+  public static String NM_HOST = "localhost";
+  public static int NM_PORT = 1234;
+  public static int NM_HTTP_PORT = 8042;
+
+  private static final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  //if true, tasks complete automatically as soon as they are launched
+  protected boolean autoComplete = false;
+
+  static ApplicationId applicationId;
+
+  static {
+    applicationId = recordFactory.newRecordInstance(ApplicationId.class);
+    applicationId.setClusterTimestamp(0);
+    applicationId.setId(0);
+  }
+
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, Clock clock) {
+    this(maps, reduces, autoComplete, testName, cleanOnStart, 1, clock);
+  }
+
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart) {
+    this(maps, reduces, autoComplete, testName, cleanOnStart, 1);
+  }
+  
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, int startCount) {
+    this(maps, reduces, autoComplete, testName, cleanOnStart, startCount,
+        new SystemClock());
+  }
+
+  public MRApp(int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, int startCount, Clock clock) {
+    this(getApplicationAttemptId(applicationId, startCount), getContainerId(
+      applicationId, startCount), maps, reduces, autoComplete, testName,
+      cleanOnStart, startCount, clock);
+  }
+
+  public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
+      int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, int startCount) {
+    this(appAttemptId, amContainerId, maps, reduces, autoComplete, testName,
+        cleanOnStart, startCount, new SystemClock());
+  }
+
+  public MRApp(ApplicationAttemptId appAttemptId, ContainerId amContainerId,
+      int maps, int reduces, boolean autoComplete, String testName,
+      boolean cleanOnStart, int startCount, Clock clock) {
+    super(appAttemptId, amContainerId, NM_HOST, NM_PORT, NM_HTTP_PORT, clock, System
+        .currentTimeMillis());
+    this.testWorkDir = new File("target", testName);
+    testAbsPath = new Path(testWorkDir.getAbsolutePath());
+    LOG.info("PathUsed: " + testAbsPath);
+    if (cleanOnStart) {
+      testAbsPath = new Path(testWorkDir.getAbsolutePath());
+      try {
+        FileContext.getLocalFSFileContext().delete(testAbsPath, true);
+      } catch (Exception e) {
+        LOG.warn("COULD NOT CLEANUP: " + testAbsPath, e);
+        throw new YarnException("could not cleanup test dir", e);
+      }
+    }
+
+    this.maps = maps;
+    this.reduces = reduces;
+    this.autoComplete = autoComplete;
+  }
+
+  
+  
+  @Override
+  public void init(Configuration conf) {
+    super.init(conf);
+    if (this.clusterInfo != null) {
+      getContext().getClusterInfo().setMinContainerCapability(
+          this.clusterInfo.getMinContainerCapability());
+      getContext().getClusterInfo().setMaxContainerCapability(
+          this.clusterInfo.getMaxContainerCapability());
+    } else {
+      getContext().getClusterInfo().setMinContainerCapability(
+          BuilderUtils.newResource(1024));
+      getContext().getClusterInfo().setMaxContainerCapability(
+          BuilderUtils.newResource(10240));
+    }
+    // XXX Any point doing this here. Otherwise move to an overridden createDispatcher()
+//    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, false);
+  }
+
+  public Job submit(Configuration conf) throws Exception {
+    String user = conf.get(MRJobConfig.USER_NAME, UserGroupInformation
+      .getCurrentUser().getShortUserName());
+    conf.set(MRJobConfig.USER_NAME, user);
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, testAbsPath.toString());
+    conf.setBoolean(MRJobConfig.MR_AM_CREATE_JH_INTERMEDIATE_BASE_DIR, true);
+    //TODO: fix the bug where the speculator gets events with 
+    //not-fully-constructed objects. For now, disable speculative exec
+    LOG.info("****DISABLING SPECULATIVE EXECUTION*****");
+    conf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false);
+    conf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false);
+
+    init(conf);
+    start();
+    DefaultMetricsSystem.shutdown();
+    Job job = getContext().getAllJobs().values().iterator().next();
+
+    // Write job.xml
+    String jobFile = MRApps.getJobFile(conf, user,
+      TypeConverter.fromYarn(job.getID()));
+    LOG.info("Writing job conf to " + jobFile);
+    new File(jobFile).getParentFile().mkdirs();
+    conf.writeXml(new FileOutputStream(jobFile));
+
+    return job;
+  }
+
+  public void waitForState(TaskAttempt attempt, 
+      TaskAttemptState finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskAttemptReport report = attempt.getReport();
+    while (!finalState.equals(report.getTaskAttemptState()) &&
+        timeoutSecs++ < 20) {
+      System.out.println("TaskAttempt State is : " + report.getTaskAttemptState() +
+          " Waiting for state : " + finalState +
+          "   progress : " + report.getProgress());
+      report = attempt.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("TaskAttempt State is : " + report.getTaskAttemptState());
+    Assert.assertEquals("TaskAttempt state is not correct (timedout)",
+        finalState, 
+        report.getTaskAttemptState());
+  }
+
+  public void waitForState(Task task, TaskState finalState) throws Exception {
+    int timeoutSecs = 0;
+    TaskReport report = task.getReport();
+    while (!finalState.equals(report.getTaskState()) &&
+        timeoutSecs++ < 20) {
+      System.out.println("Task State for " + task.getID() + " is : "
+          + report.getTaskState() + " Waiting for state : " + finalState
+          + "   progress : " + report.getProgress());
+      report = task.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("Task State is : " + report.getTaskState());
+    Assert.assertEquals("Task state is not correct (timedout)", finalState, 
+        report.getTaskState());
+  }
+
+  public void waitForState(Job job, JobState finalState) throws Exception {
+    int timeoutSecs = 0;
+    JobReport report = job.getReport();
+    while (!finalState.equals(report.getJobState()) &&
+        timeoutSecs++ < 20) {
+      System.out.println("Job State is : " + report.getJobState() +
+          " Waiting for state : " + finalState +
+          "   map progress : " + report.getMapProgress() + 
+          "   reduce progress : " + report.getReduceProgress());
+      report = job.getReport();
+      Thread.sleep(500);
+    }
+    System.out.println("Job State is : " + report.getJobState());
+    Assert.assertEquals("Job state is not correct (timedout)", finalState, 
+        job.getState());
+  }
+
+  public void waitForState(Service.STATE finalState) throws Exception {
+    int timeoutSecs = 0;
+    while (!finalState.equals(getServiceState()) && timeoutSecs++ < 20) {
+      System.out.println("MRApp State is : " + getServiceState()
+          + " Waiting for state : " + finalState);
+      Thread.sleep(500);
+    }
+    System.out.println("MRApp State is : " + getServiceState());
+    Assert.assertEquals("MRApp state is not correct (timedout)", finalState,
+        getServiceState());
+  }
+
+  public void verifyCompleted() {
+    for (Job job : getContext().getAllJobs().values()) {
+      JobReport jobReport = job.getReport();
+      System.out.println("Job start time :" + jobReport.getStartTime());
+      System.out.println("Job finish time :" + jobReport.getFinishTime());
+      Assert.assertTrue("Job start time is not less than finish time",
+          jobReport.getStartTime() <= jobReport.getFinishTime());
+      Assert.assertTrue("Job finish time is in future",
+          jobReport.getFinishTime() <= System.currentTimeMillis());
+      for (Task task : job.getTasks().values()) {
+        TaskReport taskReport = task.getReport();
+        System.out.println("Task start time : " + taskReport.getStartTime());
+        System.out.println("Task finish time : " + taskReport.getFinishTime());
+        Assert.assertTrue("Task start time is not less than finish time",
+            taskReport.getStartTime() <= taskReport.getFinishTime());
+        for (TaskAttempt attempt : task.getAttempts().values()) {
+          TaskAttemptReport attemptReport = attempt.getReport();
+          Assert.assertTrue("Attempt start time is not less than finish time",
+              attemptReport.getStartTime() <= attemptReport.getFinishTime());
+        }
+      }
+    }
+  }
+
+  @Override
+  protected void downloadTokensAndSetupUGI(Configuration conf) {
+  }
+
+  private static ApplicationAttemptId getApplicationAttemptId(
+      ApplicationId applicationId, int startCount) {
+    ApplicationAttemptId applicationAttemptId =
+        recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    applicationAttemptId.setApplicationId(applicationId);
+    applicationAttemptId.setAttemptId(startCount);
+    return applicationAttemptId;
+  }
+  
+  private static ContainerId getContainerId(ApplicationId applicationId,
+      int startCount) {
+    ApplicationAttemptId appAttemptId =
+        getApplicationAttemptId(applicationId, startCount);
+    ContainerId containerId =
+        BuilderUtils.newContainerId(appAttemptId, startCount);
+    return containerId;
+  }
+  
+  @Override
+  protected Job createJob(Configuration conf) {
+    UserGroupInformation currentUser = null;
+    try {
+      currentUser = UserGroupInformation.getCurrentUser();
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+    Job newJob = new TestJob(getJobId(), getAttemptID(), conf, 
+    		getDispatcher().getEventHandler(),
+            getTaskAttemptListener(), getContext().getClock(), getCommitter(),
+            isNewApiCommitter(), currentUser.getUserName(),
+            getTaskHeartbeatHandler(), getContext());
+    ((AppContext) getContext()).getAllJobs().put(newJob.getID(), newJob);
+
+    getDispatcher().register(JobFinishEvent.Type.class, new MRAppJobFinishHandler());
+
+    return newJob;
+  }
+  
+  protected class MRAppJobFinishHandler extends JobFinishEventHandlerCR {
+    @Override
+    protected void exit() {
+    }
+    
+    @Override
+    protected void maybeSendJobEndNotification() {
+    }
+  }
+
+  @Override
+  protected TaskAttemptListener createTaskAttemptListener(AppContext context,
+      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
+    return new TaskAttemptListener(){
+
+      @Override
+      public InetSocketAddress getAddress() {
+        return NetUtils.createSocketAddr("localhost:54321");
+      }
+      
+      @Override
+      public void registerRunningJvm(WrappedJvmID jvmID, ContainerId containerId) {
+        // TODO Auto-generated method stub
+        
+      }
+      @Override
+      public void unregisterRunningJvm(WrappedJvmID jvmID) {
+        // TODO Auto-generated method stub
+        
+      }
+      @Override
+      public void unregisterTaskAttempt(TaskAttemptId attemptID) {
+        // TODO Auto-generated method stub
+        
+      }
+
+      @Override
+      public void registerTaskAttempt(TaskAttemptId attemptId,
+          WrappedJvmID jvmId) {
+        // TODO Auto-generated method stub
+        
+      }
+    };
+  }
+  
+  @Override
+  protected TaskHeartbeatHandler createTaskHeartbeatHandler(AppContext context,
+      Configuration conf) {
+    return new TaskHeartbeatHandler(context, maps) {
+
+      @Override
+      public void init(Configuration conf) {
+      }
+
+      @Override
+      public void start() {
+      }
+
+      @Override
+      public void stop() {
+      }
+    };
+  }
+
+  @Override
+  protected ContainerHeartbeatHandler createContainerHeartbeatHandler(
+      AppContext context, Configuration conf) {
+    return new ContainerHeartbeatHandler(context, 1) {
+      @Override
+      public void init(Configuration conf) {
+      }
+
+      @Override
+      public void start() {
+      }
+
+      @Override
+      public void stop() {
+      }
+    };
+  }
+
+  @Override
+  protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+      AppContext context) {//disable history
+    return new EventHandler<JobHistoryEvent>() {
+      @Override
+      public void handle(JobHistoryEvent event) {
+      }
+    };
+  }
+  
+  
+  
+  @Override
+  protected ContainerLauncher createContainerLauncher(AppContext context) {
+    return new MockContainerLauncher();
+  }
+
+  // appAcls and attemptToContainerIdMap shared between various mocks.
+  private Map<ApplicationAccessType, String> appAcls = new HashMap<ApplicationAccessType, String>();
+  private Map<TaskAttemptId, ContainerId> attemptToContainerIdMap = new HashMap<TaskAttemptId, ContainerId>();
+  
+  protected class MockContainerLauncher implements ContainerLauncher {
+
+    //We are running locally so set the shuffle port to -1 
+    int shufflePort = -1;
+
+    public MockContainerLauncher() {
+    }
+
+//    @Override
+    public void handle(NMCommunicatorEvent event) {
+      switch (event.getType()) {
+      case CONTAINER_LAUNCH_REQUEST:
+        LOG.info("XXX: Handling CONTAINER_LAUNCH_REQUEST for: " + event.getContainerId());
+        
+        AMContainer amContainer = getContext().getContainer(event.getContainerId());
+        TaskAttemptId attemptIdForContainer = amContainer.getQueuedTaskAttempts().iterator().next();
+        // Container Launched.
+        getContext().getEventHandler().handle(
+            new AMContainerEventLaunched(event.getContainerId(), shufflePort));
+        
+        // Simulate a TaskPull from the remote task.
+        getContext().getEventHandler().handle(
+            new AMContainerEvent(event.getContainerId(),
+                AMContainerEventType.C_PULL_TA));
+         
+        // Simulate a TaskAttemptStartedEvent to the TaskAtetmpt.
+        // Maybe simulate a completed task.
+        getContext().getEventHandler().handle(
+            new TaskAttemptRemoteStartEvent(attemptIdForContainer, event.getContainerId(), appAcls,
+                shufflePort));
+        attemptLaunched(attemptIdForContainer);
+
+        break;
+      case CONTAINER_STOP_REQUEST:
+        ContainerStatus cs = Records.newRecord(ContainerStatus.class);
+        cs.setContainerId(event.getContainerId());
+        getContext().getEventHandler().handle(new AMContainerEventReleased(cs));
+        break;
+      }
+    }
+  }
+
+  protected void attemptLaunched(TaskAttemptId attemptId) {
+    if (autoComplete) {
+      // send the done event
+      getContext().getEventHandler().handle(
+          new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_DONE));
+    }
+  }
+
+  @Override
+  protected RMContainerRequestor createRMContainerRequestor(
+      ClientService clientService, AppContext appContext) {
+    return new MRAppContainerRequestor(clientService, appContext);
+  }
+  
+  protected class MRAppContainerRequestor extends RMContainerRequestor {
+
+    int numReleaseRequests;
+    
+    public MRAppContainerRequestor(ClientService clientService,
+        AppContext context) {
+      super(clientService, context);
+    }
+    
+    @Override public void init(Configuration conf) {}
+    @Override public void start() {}
+    @Override public void stop() {}
+    //TODO XXX: getApplicationAcls, getJob
+    
+    @Override public void addContainerReq(ContainerRequest req) {}
+    @Override public void decContainerReq(ContainerRequest req) {}
+
+    public void handle(RMCommunicatorEvent rawEvent) {
+      LOG.info("XXX: MRAppContainerRequestor handling event of type:" + rawEvent.getType() + ", event: " + rawEvent);
+      switch (rawEvent.getType()) {
+      case CONTAINER_DEALLOCATE:
+        numReleaseRequests++;
+        ContainerStatus cs = Records.newRecord(ContainerStatus.class);
+        cs.setContainerId(((RMCommunicatorContainerDeAllocateRequestEvent)rawEvent).getContainerId());
+        getContext().getEventHandler().handle(new AMContainerEventReleased(cs));
+        break;
+      default:
+        LOG.warn("Invalid event of type: " + rawEvent.getType() + ", Event: "
+            + rawEvent);
+        break;
+      }
+    }
+
+    public int getNumReleaseRequests() {
+      return numReleaseRequests;
+    }
+  }
+ 
+  @Override
+  protected ContainerAllocator createAMScheduler(
+      RMContainerRequestor requestor, AppContext appContext) {
+    return new MRAppAMScheduler();    
+  }
+
+  protected class MRAppAMScheduler implements ContainerAllocator {
+    private int containerCount;
+    
+    
+    @Override
+    public void handle(AMSchedulerEvent rawEvent) {
+      LOG.info("XXX: MRAppAMScheduler handling event of type:" + rawEvent.getType() + ", event: " + rawEvent);
+      switch (rawEvent.getType()) {
+      case S_TA_LAUNCH_REQUEST:
+        AMSchedulerTALaunchRequestEvent lEvent = (AMSchedulerTALaunchRequestEvent)rawEvent;
+        ContainerId cId = Records.newRecord(ContainerId.class);
+        cId.setApplicationAttemptId(getContext().getApplicationAttemptId());
+        cId.setId(containerCount++);
+        NodeId nodeId = BuilderUtils.newNodeId(NM_HOST, NM_PORT);
+        Container container = BuilderUtils.newContainer(cId, nodeId,
+            NM_HOST + ":" + NM_HTTP_PORT, null, null, null);
+        
+        getContext().getAllContainers().addNewContainer(container);
+        getContext().getAllNodes().nodeSeen(nodeId);
+        
+        JobID id = TypeConverter.fromYarn(applicationId);
+        JobId jobId = TypeConverter.toYarn(id);
+        getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
+            new NormalizedResourceEvent(TaskType.REDUCE, 100)));
+        getContext().getEventHandler().handle(new JobHistoryEvent(jobId, 
+            new NormalizedResourceEvent(TaskType.MAP, 100)));
+        
+        attemptToContainerIdMap.put(lEvent.getAttemptID(), cId);
+        if (getContext().getContainer(cId).getState() == AMContainerState.ALLOCATED) {
+          LOG.info("XXX: Sending launch request for container: " + lEvent);
+          getContext().getEventHandler().handle(
+              new AMContainerLaunchRequestEvent(cId, lEvent, appAcls, jobId));
+        }
+        LOG.info("XXX: Assigning attempt [" + lEvent.getAttemptID() + "] to Container [" + cId + "]");
+        getContext().getEventHandler().handle(
+            new AMContainerAssignTAEvent(cId, lEvent.getAttemptID(), lEvent
+                .getRemoteTask()));
+
+        break;
+      case S_TA_STOP_REQUEST:
+        // Send out a Container_stop_request.
+        AMSchedulerTAStopRequestEvent stEvent = (AMSchedulerTAStopRequestEvent) rawEvent;
+        getContext().getEventHandler().handle(
+            new AMContainerEvent(attemptToContainerIdMap.get(stEvent
+                .getAttemptID()), AMContainerEventType.C_STOP_REQUEST));
+
+        break;
+      case S_TA_SUCCEEDED:
+        break;
+      case S_CONTAINERS_ALLOCATED:
+        break;
+      case S_CONTAINER_COMPLETED:
+        break;
+      default:
+          break;
+      }
+    }
+  }
+
+  @Override
+  protected TaskCleaner createTaskCleaner(AppContext context) {
+    return new TaskCleaner() {
+      @Override
+      public void handle(TaskCleanupEvent event) {
+        //send the cleanup done event
+//        getContext().getEventHandler().handle(
+//            new TaskAttemptEvent(event.getAttemptID(),
+//                TaskAttemptEventType.TA_CLEANUP_DONE));
+        
+        // TODO XXX: Kindof equivalent to saying the container is complete / released.
+      }
+    };
+  }
+
+  @Override
+  protected ClientService createClientService(AppContext context) {
+    return new ClientService(){
+      @Override
+      public InetSocketAddress getBindAddress() {
+        return NetUtils.createSocketAddr("localhost:9876");
+      }
+
+      @Override
+      public int getHttpPort() {
+        return -1;
+      }
+    };
+  }
+
+  public void setClusterInfo(ClusterInfo clusterInfo) {
+    // Only useful if set before a job is started.
+    if (getServiceState() == Service.STATE.NOTINITED
+        || getServiceState() == Service.STATE.INITED) {
+      this.clusterInfo = clusterInfo;
+    } else {
+      throw new IllegalStateException(
+          "ClusterInfo can only be set before the App is STARTED");
+    }
+  }
+
+  class TestJob extends JobImpl {
+    //override the init transition
+    private final TestInitTransition initTransition = new TestInitTransition(
+        maps, reduces);
+    StateMachineFactory<JobImpl, JobState, JobEventType, JobEvent> localFactory
+        = stateMachineFactory.addTransition(JobState.NEW,
+            EnumSet.of(JobState.INITED, JobState.FAILED),
+            JobEventType.JOB_INIT,
+            // This is abusive.
+            initTransition);
+
+    private final StateMachine<JobState, JobEventType, JobEvent>
+        localStateMachine;
+
+    @Override
+    protected StateMachine<JobState, JobEventType, JobEvent> getStateMachine() {
+      return localStateMachine;
+    }
+
+    @SuppressWarnings("rawtypes")
+    public TestJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+        Configuration conf, EventHandler eventHandler,
+        TaskAttemptListener taskAttemptListener,  Clock clock,
+        OutputCommitter committer, boolean newApiCommitter, String user,
+        TaskHeartbeatHandler thh, AppContext appContext) {
+      super(jobId, getApplicationAttemptId(applicationId, getStartCount()),
+          conf, eventHandler, taskAttemptListener,
+          new JobTokenSecretManager(), new Credentials(), clock,
+          getCompletedTaskFromPreviousRun(), metrics, committer,
+          newApiCommitter, user, System.currentTimeMillis(), getAllAMInfos(),
+          thh, appContext);
+
+      // This "this leak" is okay because the retained pointer is in an
+      //  instance variable.
+      localStateMachine = localFactory.make(this);
+    }
+  }
+
+  //Override InitTransition to not look for split files etc
+  static class TestInitTransition extends JobImpl.InitTransition {
+    private int maps;
+    private int reduces;
+    TestInitTransition(int maps, int reduces) {
+      this.maps = maps;
+      this.reduces = reduces;
+    }
+    @Override
+    protected void setup(JobImpl job) throws IOException {
+      super.setup(job);
+      job.conf.setInt(MRJobConfig.NUM_REDUCES, reduces);
+      job.remoteJobConfFile = new Path("test");
+    }
+    @Override
+    protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
+      TaskSplitMetaInfo[] splits = new TaskSplitMetaInfo[maps];
+      for (int i = 0; i < maps ; i++) {
+        splits[i] = new TaskSplitMetaInfo();
+      }
+      return splits;
+    }
+  }
+
+}
+ 

Added: hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java?rev=1376283&view=auto
==============================================================================
--- hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java (added)
+++ hadoop/common/branches/MR-3902/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app2/src/test/java/org/apache/hadoop/mapreduce/v2/app2/MRAppBenchmark.java Wed Aug 22 22:11:39 2012
@@ -0,0 +1,283 @@
+///**
+//* Licensed to the Apache Software Foundation (ASF) under one
+//* or more contributor license agreements.  See the NOTICE file
+//* distributed with this work for additional information
+//* regarding copyright ownership.  The ASF licenses this file
+//* to you under the Apache License, Version 2.0 (the
+//* "License"); you may not use this file except in compliance
+//* with the License.  You may obtain a copy of the License at
+//*
+//*     http://www.apache.org/licenses/LICENSE-2.0
+//*
+//* Unless required by applicable law or agreed to in writing, software
+//* distributed under the License is distributed on an "AS IS" BASIS,
+//* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//* See the License for the specific language governing permissions and
+//* limitations under the License.
+//*/
+//
+//package org.apache.hadoop.mapreduce.v2.app2;
+//
+//import java.util.ArrayList;
+//import java.util.List;
+//import java.util.concurrent.BlockingQueue;
+//import java.util.concurrent.LinkedBlockingQueue;
+//
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+//import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+//import org.apache.hadoop.mapreduce.v2.app2.client.ClientService;
+//import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+//import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptContainerAssignedEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocator;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.ContainerAllocatorEvent;
+//import org.apache.hadoop.mapreduce.v2.app2.rm.RMContainerAllocator;
+//import org.apache.hadoop.yarn.YarnException;
+//import org.apache.hadoop.yarn.api.AMRMProtocol;
+//import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+//import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+//import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+//import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+//import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+//import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+//import org.apache.hadoop.yarn.api.records.AMResponse;
+//import org.apache.hadoop.yarn.api.records.Container;
+//import org.apache.hadoop.yarn.api.records.ContainerId;
+//import org.apache.hadoop.yarn.api.records.NodeId;
+//import org.apache.hadoop.yarn.api.records.ResourceRequest;
+//import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
+//import org.apache.hadoop.yarn.factories.RecordFactory;
+//import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+//import org.apache.hadoop.yarn.service.AbstractService;
+//import org.apache.hadoop.yarn.util.BuilderUtils;
+//import org.apache.hadoop.yarn.util.Records;
+//import org.apache.log4j.Level;
+//import org.apache.log4j.LogManager;
+//import org.apache.log4j.Logger;
+//import org.junit.Test;
+//
+//public class MRAppBenchmark {
+//
+//  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+//
+//  /**
+//   * Runs memory and time benchmark with Mock MRApp.
+//   */
+//  public void run(MRApp app) throws Exception {
+//    Logger rootLogger = LogManager.getRootLogger();
+//    rootLogger.setLevel(Level.WARN);
+//    long startTime = System.currentTimeMillis();
+//    Job job = app.submit(new Configuration());
+//    while (!job.getReport().getJobState().equals(JobState.SUCCEEDED)) {
+//      printStat(job, startTime);
+//      Thread.sleep(2000);
+//    }
+//    printStat(job, startTime);
+//  }
+//
+//  private void printStat(Job job, long startTime) throws Exception {
+//    long currentTime = System.currentTimeMillis();
+//    Runtime.getRuntime().gc();
+//    long mem = Runtime.getRuntime().totalMemory() 
+//      - Runtime.getRuntime().freeMemory();
+//    System.out.println("JobState:" + job.getState() +
+//        " CompletedMaps:" + job.getCompletedMaps() +
+//        " CompletedReduces:" + job.getCompletedReduces() +
+//        " Memory(total-free)(KB):" + mem/1024 +
+//        " ElapsedTime(ms):" + (currentTime - startTime));
+//  }
+//
+//  //Throttles the maximum number of concurrent running tasks.
+//  //This affects the memory requirement since 
+//  //org.apache.hadoop.mapred.MapTask/ReduceTask is loaded in memory for all
+//  //running task and discarded once the task is launched.
+//  static class ThrottledMRApp extends MRApp {
+//
+//    int maxConcurrentRunningTasks;
+//    volatile int concurrentRunningTasks;
+//    ThrottledMRApp(int maps, int reduces, int maxConcurrentRunningTasks) {
+//      super(maps, reduces, true, "ThrottledMRApp", true);
+//      this.maxConcurrentRunningTasks = maxConcurrentRunningTasks;
+//    }
+//    
+//    @Override
+//    protected void attemptLaunched(TaskAttemptId attemptID) {
+//      super.attemptLaunched(attemptID);
+//      //the task is launched and sends done immediately
+//      concurrentRunningTasks--;
+//    }
+//    
+//    @Override
+//    protected ContainerAllocator createContainerAllocator(
+//        ClientService clientService, AppContext context) {
+//      return new ThrottledContainerAllocator();
+//    }
+//    
+//    class ThrottledContainerAllocator extends AbstractService 
+//        implements ContainerAllocator {
+//      private int containerCount;
+//      private Thread thread;
+//      private BlockingQueue<ContainerAllocatorEvent> eventQueue =
+//        new LinkedBlockingQueue<ContainerAllocatorEvent>();
+//      public ThrottledContainerAllocator() {
+//        super("ThrottledContainerAllocator");
+//      }
+//      @Override
+//      public void handle(ContainerAllocatorEvent event) {
+//        try {
+//          eventQueue.put(event);
+//        } catch (InterruptedException e) {
+//          throw new YarnException(e);
+//        }
+//      }
+//      @Override
+//      public void start() {
+//        thread = new Thread(new Runnable() {
+//          @Override
+//          public void run() {
+//            ContainerAllocatorEvent event = null;
+//            while (!Thread.currentThread().isInterrupted()) {
+//              try {
+//                if (concurrentRunningTasks < maxConcurrentRunningTasks) {
+//                  event = eventQueue.take();
+//                  ContainerId cId = 
+//                      recordFactory.newRecordInstance(ContainerId.class);
+//                  cId.setApplicationAttemptId(
+//                      getContext().getApplicationAttemptId());
+//                  cId.setId(containerCount++);
+//                  //System.out.println("Allocating " + containerCount);
+//                  
+//                  Container container = 
+//                      recordFactory.newRecordInstance(Container.class);
+//                  container.setId(cId);
+//                  NodeId nodeId = recordFactory.newRecordInstance(NodeId.class);
+//                  nodeId.setHost("dummy");
+//                  nodeId.setPort(1234);
+//                  container.setNodeId(nodeId);
+//                  container.setContainerToken(null);
+//                  container.setNodeHttpAddress("localhost:8042");
+//                  getContext().getEventHandler()
+//                      .handle(
+//                      new TaskAttemptContainerAssignedEvent(event
+//                          .getAttemptID(), container, null));
+//                  concurrentRunningTasks++;
+//                } else {
+//                  Thread.sleep(1000);
+//                }
+//              } catch (InterruptedException e) {
+//                System.out.println("Returning, interrupted");
+//                return;
+//              }
+//            }
+//          }
+//        });
+//        thread.start();
+//        super.start();
+//      }
+//
+//      @Override
+//      public void stop() {
+//        thread.interrupt();
+//        super.stop();
+//      }
+//    }
+//  }
+//
+//  @Test
+//  public void benchmark1() throws Exception {
+//    int maps = 100; // Adjust for benchmarking. Start with thousands.
+//    int reduces = 0;
+//    System.out.println("Running benchmark with maps:"+maps +
+//        " reduces:"+reduces);
+//    run(new MRApp(maps, reduces, true, this.getClass().getName(), true) {
+//
+//      @Override
+//      protected ContainerAllocator createContainerAllocator(
+//          ClientService clientService, AppContext context) {
+//        return new RMContainerAllocator(clientService, context) {
+//          @Override
+//          protected AMRMProtocol createSchedulerProxy() {
+//            return new AMRMProtocol() {
+//
+//              @Override
+//              public RegisterApplicationMasterResponse
+//                  registerApplicationMaster(
+//                      RegisterApplicationMasterRequest request)
+//                      throws YarnRemoteException {
+//                RegisterApplicationMasterResponse response =
+//                    Records.newRecord(RegisterApplicationMasterResponse.class);
+//                response.setMinimumResourceCapability(BuilderUtils
+//                  .newResource(1024));
+//                response.setMaximumResourceCapability(BuilderUtils
+//                  .newResource(10240));
+//                return response;
+//              }
+//
+//              @Override
+//              public FinishApplicationMasterResponse finishApplicationMaster(
+//                  FinishApplicationMasterRequest request)
+//                  throws YarnRemoteException {
+//                FinishApplicationMasterResponse response =
+//                    Records.newRecord(FinishApplicationMasterResponse.class);
+//                return response;
+//              }
+//
+//              @Override
+//              public AllocateResponse allocate(AllocateRequest request)
+//                  throws YarnRemoteException {
+//
+//                AllocateResponse response =
+//                    Records.newRecord(AllocateResponse.class);
+//                List<ResourceRequest> askList = request.getAskList();
+//                List<Container> containers = new ArrayList<Container>();
+//                for (ResourceRequest req : askList) {
+//                  if (req.getHostName() != "*") {
+//                    continue;
+//                  }
+//                  int numContainers = req.getNumContainers();
+//                  for (int i = 0; i < numContainers; i++) {
+//                    ContainerId containerId =
+//                        BuilderUtils.newContainerId(
+//                          request.getApplicationAttemptId(),
+//                          request.getResponseId() + i);
+//                    containers.add(BuilderUtils
+//                      .newContainer(containerId, BuilderUtils.newNodeId("host"
+//                          + containerId.getId(), 2345),
+//                        "host" + containerId.getId() + ":5678", req
+//                          .getCapability(), req.getPriority(), null));
+//                  }
+//                }
+//
+//                AMResponse amResponse = Records.newRecord(AMResponse.class);
+//                amResponse.setAllocatedContainers(containers);
+//                amResponse.setResponseId(request.getResponseId() + 1);
+//                response.setAMResponse(amResponse);
+//                response.setNumClusterNodes(350);
+//                return response;
+//              }
+//            };
+//          }
+//        };
+//      }
+//    });
+//  }
+//
+//  @Test
+//  public void benchmark2() throws Exception {
+//    int maps = 100; // Adjust for benchmarking, start with a couple of thousands
+//    int reduces = 50;
+//    int maxConcurrentRunningTasks = 500;
+//    
+//    System.out.println("Running benchmark with throttled running tasks with " +
+//        "maxConcurrentRunningTasks:" + maxConcurrentRunningTasks +
+//        " maps:" + maps + " reduces:" + reduces);
+//    run(new ThrottledMRApp(maps, reduces, maxConcurrentRunningTasks));
+//  }
+//
+//  public static void main(String[] args) throws Exception {
+//    MRAppBenchmark benchmark = new MRAppBenchmark();
+//    benchmark.benchmark1();
+//    benchmark.benchmark2();
+//  }
+//
+//}



Mime
View raw message