tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1457129 [30/38] - in /incubator/tez: ./ tez-ampool/ tez-ampool/src/ tez-ampool/src/main/ tez-ampool/src/main/bin/ tez-ampool/src/main/conf/ tez-ampool/src/main/java/ tez-ampool/src/main/java/org/ tez-ampool/src/main/java/org/apache/ tez-am...
Date Fri, 15 Mar 2013 21:26:48 GMT
Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/JobsInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/JobsInfo.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/JobsInfo.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/JobsInfo.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by joblicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "jobs")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class JobsInfo {
+
+  protected ArrayList<JobInfo> job = new ArrayList<JobInfo>();
+
+  public JobsInfo() {
+  } // JAXB needs this
+
+  public void add(JobInfo jobInfo) {
+    job.add(jobInfo);
+  }
+
+  public ArrayList<JobInfo> getJobs() {
+    return job;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/ReduceTaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/ReduceTaskAttemptInfo.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/ReduceTaskAttemptInfo.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/ReduceTaskAttemptInfo.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.yarn.util.Times;
+
+@XmlRootElement(name = "taskAttempt")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class ReduceTaskAttemptInfo extends TaskAttemptInfo {
+
+  protected long shuffleFinishTime;
+  protected long mergeFinishTime;
+  protected long elapsedShuffleTime;
+  protected long elapsedMergeTime;
+  protected long elapsedReduceTime;
+
+  public ReduceTaskAttemptInfo() {
+  }
+
+  public ReduceTaskAttemptInfo(TaskAttempt ta, TaskType type) {
+    super(ta, type, false);
+
+    this.shuffleFinishTime = ta.getShuffleFinishTime();
+    this.mergeFinishTime = ta.getSortFinishTime();
+    this.elapsedShuffleTime = Times.elapsed(this.startTime,
+        this.shuffleFinishTime, false);
+    if (this.elapsedShuffleTime == -1) {
+      this.elapsedShuffleTime = 0;
+    }
+    this.elapsedMergeTime = Times.elapsed(this.shuffleFinishTime,
+        this.mergeFinishTime, false);
+    if (this.elapsedMergeTime == -1) {
+      this.elapsedMergeTime = 0;
+    }
+    this.elapsedReduceTime = Times.elapsed(this.mergeFinishTime,
+        this.finishTime, false);
+    if (this.elapsedReduceTime == -1) {
+      this.elapsedReduceTime = 0;
+    }
+  }
+
+  public long getShuffleFinishTime() {
+    return this.shuffleFinishTime;
+  }
+
+  public long getMergeFinishTime() {
+    return this.mergeFinishTime;
+  }
+
+  public long getElapsedShuffleTime() {
+    return this.elapsedShuffleTime;
+  }
+
+  public long getElapsedMergeTime() {
+    return this.elapsedMergeTime;
+  }
+
+  public long getElapsedReduceTime() {
+    return this.elapsedReduceTime;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskAttemptInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskAttemptInfo.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskAttemptInfo.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskAttemptInfo.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import java.util.List;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlSeeAlso;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Times;
+
+@XmlRootElement(name = "taskAttempt")
+@XmlSeeAlso({ ReduceTaskAttemptInfo.class })
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TaskAttemptInfo {
+
+  protected long startTime;
+  protected long finishTime;
+  protected long elapsedTime;
+  protected float progress;
+  protected String id;
+  protected String rack;
+  protected TaskAttemptState state;
+  protected String nodeHttpAddress;
+  protected String diagnostics;
+  protected String type;
+  protected String assignedContainerId;
+
+  @XmlTransient
+  protected ContainerId assignedContainer;
+
+  public TaskAttemptInfo() {
+  }
+
+  public TaskAttemptInfo(TaskAttempt ta, Boolean isRunning) {
+    this(ta, TaskType.MAP, isRunning);
+  }
+
+  public TaskAttemptInfo(TaskAttempt ta, TaskType type, Boolean isRunning) {
+    this.type = type.toString();
+    this.id = MRApps.toString(ta.getID());
+    this.nodeHttpAddress = ta.getNodeHttpAddress();
+    this.startTime = ta.getLaunchTime();
+    this.finishTime = ta.getFinishTime();
+    this.assignedContainerId = ConverterUtils.toString(ta
+        .getAssignedContainerID());
+    this.assignedContainer = ta.getAssignedContainerID();
+    this.progress = ta.getProgress() * 100;
+    this.state = ta.getState();
+    this.elapsedTime = Times
+        .elapsed(this.startTime, this.finishTime, isRunning);
+    if (this.elapsedTime == -1) {
+      this.elapsedTime = 0;
+    }
+    List<String> diagnostics = ta.getDiagnostics();
+    if (diagnostics != null && !diagnostics.isEmpty()) {
+      StringBuffer b = new StringBuffer();
+      for (String diag : diagnostics) {
+        b.append(diag);
+      }
+      this.diagnostics = b.toString();
+    }
+    this.rack = ta.getNodeRackName();
+  }
+
+  public String getAssignedContainerIdStr() {
+    return this.assignedContainerId;
+  }
+
+  public ContainerId getAssignedContainerId() {
+    return this.assignedContainer;
+  }
+
+  public String getState() {
+    return this.state.toString();
+  }
+
+  public String getId() {
+    return this.id;
+  }
+
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public float getProgress() {
+    return this.progress;
+  }
+
+  public long getElapsedTime() {
+    return this.elapsedTime;
+  }
+
+  public String getNode() {
+    return this.nodeHttpAddress;
+  }
+
+  public String getRack() {
+    return this.rack;
+  }
+
+  public String getNote() {
+    return this.diagnostics;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskAttemptsInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskAttemptsInfo.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskAttemptsInfo.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskAttemptsInfo.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by taskattemptlicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "taskAttempts")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TaskAttemptsInfo {
+
+  protected ArrayList<TaskAttemptInfo> taskAttempt = new ArrayList<TaskAttemptInfo>();
+
+  public TaskAttemptsInfo() {
+  } // JAXB needs this
+
+  public void add(TaskAttemptInfo taskattemptInfo) {
+    taskAttempt.add(taskattemptInfo);
+  }
+
+  public ArrayList<TaskAttemptInfo> getTaskAttempts() {
+    return taskAttempt;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterGroupInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterGroupInfo.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterGroupInfo.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterGroupInfo.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.hadoop.mapreduce.Counter;
+import org.apache.hadoop.mapreduce.CounterGroup;
+
+@XmlRootElement
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TaskCounterGroupInfo {
+
+  protected String counterGroupName;
+  protected ArrayList<TaskCounterInfo> counter;
+
+  public TaskCounterGroupInfo() {
+  }
+
+  public TaskCounterGroupInfo(String name, CounterGroup group) {
+    this.counterGroupName = name;
+    this.counter = new ArrayList<TaskCounterInfo>();
+
+    for (Counter c : group) {
+      TaskCounterInfo cinfo = new TaskCounterInfo(c.getName(), c.getValue());
+      this.counter.add(cinfo);
+    }
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterInfo.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterInfo.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskCounterInfo.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "counter")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TaskCounterInfo {
+
+  protected String name;
+  protected long value;
+
+  public TaskCounterInfo() {
+  }
+
+  public TaskCounterInfo(String name, long value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public long getValue() {
+    return value;
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskInfo.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskInfo.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TaskInfo.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
+
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.yarn.util.Times;
+
+@XmlRootElement(name = "task")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TaskInfo {
+
+  protected long startTime;
+  protected long finishTime;
+  protected long elapsedTime;
+  protected float progress;
+  protected String id;
+  protected TaskState state;
+  protected String type;
+  protected String successfulAttempt;
+
+  @XmlTransient
+  int taskNum;
+
+  @XmlTransient
+  TaskAttempt successful;
+
+  public TaskInfo() {
+  }
+
+  public TaskInfo(Task task) {
+    TaskType ttype = task.getType();
+    this.type = ttype.toString();
+    TaskReport report = task.getReport();
+    this.startTime = report.getStartTime();
+    this.finishTime = report.getFinishTime();
+    this.elapsedTime = Times.elapsed(this.startTime, this.finishTime, false);
+    if (this.elapsedTime == -1) {
+      this.elapsedTime = 0;
+    }
+    this.state = report.getTaskState();
+    this.progress = report.getProgress() * 100;
+    this.id = MRApps.toString(task.getID());
+    this.taskNum = task.getID().getId();
+    this.successful = getSuccessfulAttempt(task);
+    if (successful != null) {
+      this.successfulAttempt = MRApps.toString(successful.getID());
+    } else {
+      this.successfulAttempt = "";
+    }
+  }
+
+  public float getProgress() {
+    return this.progress;
+  }
+
+  public String getState() {
+    return this.state.toString();
+  }
+
+  public String getId() {
+    return this.id;
+  }
+
+  public int getTaskNum() {
+    return this.taskNum;
+  }
+
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public long getFinishTime() {
+    return this.finishTime;
+  }
+
+  public long getElapsedTime() {
+    return this.elapsedTime;
+  }
+
+  public String getSuccessfulAttempt() {
+    return this.successfulAttempt;
+  }
+
+  public TaskAttempt getSuccessful() {
+    return this.successful;
+  }
+
+  private TaskAttempt getSuccessfulAttempt(Task task) {
+    for (TaskAttempt attempt : task.getAttempts().values()) {
+      if (attempt.getState() == TaskAttemptState.SUCCEEDED) {
+        return attempt;
+      }
+    }
+    return null;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TasksInfo.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TasksInfo.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TasksInfo.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/dao/TasksInfo.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by tasklicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.v2.app2.webapp.dao;
+
+import java.util.ArrayList;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlRootElement;
+
+@XmlRootElement(name = "tasks")
+@XmlAccessorType(XmlAccessType.FIELD)
+public class TasksInfo {
+
+  protected ArrayList<TaskInfo> task = new ArrayList<TaskInfo>();
+
+  public TasksInfo() {
+  } // JAXB needs this
+
+  public void add(TaskInfo taskInfo) {
+    task.add(taskInfo);
+  }
+
+  public ArrayList<TaskInfo> getTasks() {
+    return task;
+  }
+
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/package-info.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/package-info.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/package-info.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/app2/webapp/package-info.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.mapreduce.v2.app2.webapp;
+import org.apache.hadoop.classification.InterfaceAudience;

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/common/DiagnosableEvent.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/common/DiagnosableEvent.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/common/DiagnosableEvent.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/hadoop/mapreduce/v2/common/DiagnosableEvent.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.mapreduce.v2.common;
+
+
+public interface DiagnosableEvent {
+  
+  // TODO A bunch of events should be implementing this interface.
+  public String getDiagnosticInfo();
+}

Added: incubator/tez/tez-yarn-application/src/main/java/org/apache/tez/mapreduce/hadoop/TaskAttemptListenerImplTez.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/java/org/apache/tez/mapreduce/hadoop/TaskAttemptListenerImplTez.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/java/org/apache/tez/mapreduce/hadoop/TaskAttemptListenerImplTez.java (added)
+++ incubator/tez/tez-yarn-application/src/main/java/org/apache/tez/mapreduce/hadoop/TaskAttemptListenerImplTez.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,585 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+// TODO TEZ Package does not make a lot of sense.
+package org.apache.tez.mapreduce.hadoop;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskStatus;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskAttemptListener;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.app2.job.Task;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventOutputConsumable;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStartedRemotely;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStatusUpdate.TaskAttemptStatus;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerImpl;
+import org.apache.hadoop.mapreduce.v2.app2.security.authorize.MRAMPolicyProvider;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.service.AbstractService;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.ContainerTask;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.TezTypeConverters;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+import org.apache.tez.records.TezDependentTaskCompletionEvent;
+import org.apache.tez.records.TezJobID;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.apache.tez.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.records.OutputContext;
+
+@SuppressWarnings("unchecked")
+public class TaskAttemptListenerImplTez extends AbstractService implements
+    TezTaskUmbilicalProtocol, TaskAttemptListener {
+
+  private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
+      null, true);
+  
+  private static ProceedToCompletionResponse COMPLETION_RESPONSE_NO_WAIT =
+      new ProceedToCompletionResponse(true, true);
+
+  private static final Log LOG = LogFactory
+      .getLog(TaskAttemptListenerImplTez.class);
+
+  private final AppContext context;
+
+  protected final TaskHeartbeatHandler taskHeartbeatHandler;
+  protected final ContainerHeartbeatHandler containerHeartbeatHandler;
+  private final JobTokenSecretManager jobTokenSecretManager;
+  private InetSocketAddress address;
+  private Server server;
+
+
+  // TODO Use this to figure out whether an incoming ping is valid.
+  private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToContainerIdMap =
+      new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
+  
+  private Set<ContainerId> registeredContainers = Collections
+      .newSetFromMap(new ConcurrentHashMap<ContainerId, Boolean>());
+  
+  public TaskAttemptListenerImplTez(AppContext context,
+      TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
+      JobTokenSecretManager jobTokenSecretManager) {
+    super(TaskAttemptListenerImplTez.class.getName());
+    this.context = context;
+    this.jobTokenSecretManager = jobTokenSecretManager;
+    this.taskHeartbeatHandler = thh;
+    this.containerHeartbeatHandler = chh;
+  }
+
+  @Override
+  public void start() {
+    startRpcServer();
+    super.start();
+  }
+
+  protected void startRpcServer() {
+    Configuration conf = getConfig();
+    try {
+      server = new RPC.Builder(conf)
+          .setProtocol(TezTaskUmbilicalProtocol.class)
+          .setBindAddress("0.0.0.0")
+          .setPort(0)
+          .setInstance(this)
+          .setNumHandlers(
+              conf.getInt(MRJobConfig.MR_AM_TASK_LISTENER_THREAD_COUNT,
+                  MRJobConfig.DEFAULT_MR_AM_TASK_LISTENER_THREAD_COUNT))
+          .setSecretManager(jobTokenSecretManager).build();
+      
+      // Enable service authorization?
+      if (conf.getBoolean(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
+          false)) {
+        refreshServiceAcls(conf, new MRAMPolicyProvider());
+      }
+
+      server.start();
+      this.address = NetUtils.getConnectAddress(server);
+    } catch (IOException e) {
+      throw new YarnException(e);
+    }
+  }
+
+  void refreshServiceAcls(Configuration configuration, 
+      PolicyProvider policyProvider) {
+    this.server.refreshServiceAcl(configuration, policyProvider);
+  }
+
+  @Override
+  public void stop() {
+    stopRpcServer();
+    super.stop();
+  }
+
+  protected void stopRpcServer() {
+    server.stop();
+  }
+
+  public InetSocketAddress getAddress() {
+    return address;
+  }
+  
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    return versionID;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(this, protocol,
+        clientVersion, clientMethodsHash);
+  }
+
+  @Override
+  public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+      TezJobID jobID, int fromEventIdx, int maxEvents,
+      TezTaskAttemptID taskAttemptID) {
+    LOG.info("Dependency Completion Events request from " + taskAttemptID
+        + ". fromEventID " + fromEventIdx + " maxEvents " + maxEvents);
+
+    // TODO: shouldReset is never used. See TT. Ask for Removal.
+    boolean shouldReset = false;
+    TaskAttemptId mrv2AttemptId = TypeConverter.toYarn(IDConverter
+        .toMRTaskAttemptId(taskAttemptID));
+
+    TaskAttemptCompletionEvent[] events = context.getJob(
+        mrv2AttemptId.getTaskId().getJobId()).getTaskAttemptCompletionEvents(
+        fromEventIdx, maxEvents);
+
+    taskHeartbeatHandler.progressing(mrv2AttemptId);
+    pingContainerHeartbeatHandler(taskAttemptID);
+
+    // Filter the events to return only map completion events in the Tez format.
+    List<TezDependentTaskCompletionEvent> mapEvents = new ArrayList<TezDependentTaskCompletionEvent>();
+    for (TaskAttemptCompletionEvent event : events) {
+      if (event.getAttemptId().getTaskId().getTaskType() == TaskType.MAP) {
+        mapEvents.add(TezTypeConverters.toTez(event));
+      }
+    }
+
+    return new TezTaskDependencyCompletionEventsUpdate(
+        mapEvents
+            .toArray(new TezDependentTaskCompletionEvent[mapEvents.size()]),
+        shouldReset);
+  }
+
+  @Override
+  public ContainerTask getTask(ContainerContext containerContext)
+      throws IOException {
+
+    ContainerTask task = null;
+
+    if (containerContext == null || containerContext.getContainerId() == null) {
+      LOG.info("Invalid task request with an empty containerContext or containerId");
+      task = TASK_FOR_INVALID_JVM;
+    } else {
+      ContainerId containerId = containerContext.getContainerId();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Container with id: " + containerId + " asked for a task");
+      }
+      if (!registeredContainers.contains(containerId)) {
+        LOG.info("Container with id: " + containerId
+            + " is invalid and will be killed");
+        task = TASK_FOR_INVALID_JVM;
+      } else {
+        pingContainerHeartbeatHandler(containerId);
+        MRTaskContext taskContext = pullTaskAttemptContext(containerId);
+        if (taskContext == null) {
+          LOG.info("No task currently assigned to Container with id: "
+              + containerId);
+        } else {
+          task = new ContainerTask(taskContext, false);
+          TaskAttemptID oldTaskAttemptID = IDConverter
+              .toMRTaskAttemptId(taskContext.getTaskAttemptId());
+          TaskAttemptId mrv2TaskAttemptId = TypeConverter
+              .toYarn(oldTaskAttemptID);
+          context.getEventHandler().handle(
+              new TaskAttemptEventStartedRemotely(mrv2TaskAttemptId, containerId,
+                  context.getApplicationACLs(), context.getAllContainers()
+                      .get(containerId).getShufflePort()));
+          LOG.info("Container with id: " + containerId + " given task: "
+              + taskContext.getTaskAttemptId());
+          registerTaskAttempt(mrv2TaskAttemptId, containerId);
+        }
+      }
+    }
+
+    LOG.info("DEBUG: getTask returning task: " + task);
+    return task;
+  }
+
+  @Override
+  public boolean statusUpdate(TezTaskAttemptID taskAttemptId,
+      TezTaskStatus taskStatus) throws IOException, InterruptedException {
+    LOG.info("DEBUG: " + "Status update from: " + taskAttemptId);
+    TaskAttemptId mrv2AttemptId = TypeConverter.toYarn(IDConverter
+        .toMRTaskAttemptId(taskAttemptId));
+    taskHeartbeatHandler.progressing(mrv2AttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+    TaskAttemptStatus taskAttemptStatus = new TaskAttemptStatus();
+    taskAttemptStatus.id = mrv2AttemptId;
+    // Task sends the updated progress to the TT.
+    taskAttemptStatus.progress = taskStatus.getProgress();
+    LOG.info("DEBUG: " + "Progress of TaskAttempt " + mrv2AttemptId + " is : "
+        + taskStatus.getProgress());
+
+    // Task sends the updated state-string to the TT.
+    taskAttemptStatus.stateString = taskStatus.getStateString();
+
+    // Set the output-size when map-task finishes. Set by the task itself.
+    // outputSize is never used.
+    taskAttemptStatus.outputSize = taskStatus.getLocalOutputSize();
+
+    // Task sends the updated phase to the TT.
+    taskAttemptStatus.phase = TezTypeConverters.toYarn(taskStatus.getPhase());
+
+    // TODO TEZAM3 - AVoid the 10 layers of convresion.
+    // Counters are updated by the task. Convert counters into new format as
+    // that is the primary storage format inside the AM to avoid multiple
+    // conversions and unnecessary heap usage.
+    taskAttemptStatus.counters = TezTypeConverters.fromTez(taskStatus.getCounters());
+    
+
+    // Map Finish time set by the task (map only)
+    // TODO CLEANTEZAM - maybe differentiate between map / reduce / types
+    if (taskStatus.getMapFinishTime() != 0) {
+      taskAttemptStatus.mapFinishTime = taskStatus.getMapFinishTime();
+    }
+
+    // Shuffle Finish time set by the task (reduce only).
+    if (taskStatus.getShuffleFinishTime() != 0) {
+      taskAttemptStatus.shuffleFinishTime = taskStatus.getShuffleFinishTime();
+    }
+
+    // Sort finish time set by the task (reduce only).
+    if (taskStatus.getSortFinishTime() != 0) {
+      taskAttemptStatus.sortFinishTime = taskStatus.getSortFinishTime();
+    }
+
+    // Not Setting the task state. Used by speculation - will be set in
+    // TaskAttemptImpl
+    // taskAttemptStatus.taskState =
+    // TypeConverter.toYarn(taskStatus.getRunState());
+
+    // set the fetch failures
+    if (taskStatus.getFailedDependencies() != null
+        && taskStatus.getFailedDependencies().size() > 0) {
+      LOG.warn("Failed dependencies are not handled at the moment." +
+      		" The job is likely to fail / hang");
+      taskAttemptStatus.fetchFailedMaps = new ArrayList<TaskAttemptId>();
+      for (TezTaskAttemptID failedAttemptId : taskStatus
+          .getFailedDependencies()) {
+        taskAttemptStatus.fetchFailedMaps.add(TezTypeConverters
+            .toYarn(failedAttemptId));
+      }
+    }
+
+    // Task sends the information about the nextRecordRange to the TT
+
+    // TODO: The following are not needed here, but needed to be set somewhere
+    // inside AppMaster.
+    // taskStatus.getRunState(); // Set by the TT/JT. Transform into a state
+    // TODO
+    // taskStatus.getStartTime(); // Used to be set by the TaskTracker. This
+    // should be set by getTask().
+    // taskStatus.getFinishTime(); // Used to be set by TT/JT. Should be set
+    // when task finishes
+    // // This was used by TT to do counter updates only once every minute. So
+    // this
+    // // isn't ever changed by the Task itself.
+    // taskStatus.getIncludeCounters();
+
+    context.getEventHandler().handle(
+        new TaskAttemptEventStatusUpdate(taskAttemptStatus.id,
+            taskAttemptStatus));
+    return true;
+  }
+
+  @Override
+  public void reportDiagnosticInfo(TezTaskAttemptID taskAttemptId, String trace)
+      throws IOException {
+    LOG.info("Diagnostics report from " + taskAttemptId.toString() + ": "
+        + trace);
+
+    TaskAttemptId mrv2AttemptId = TezTypeConverters.toYarn(taskAttemptId);
+    taskHeartbeatHandler.progressing(mrv2AttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+
+    // This is mainly used for cases where we want to propagate exception traces
+    // of tasks that fail.
+
+    // This call exists as a hadoop mapreduce legacy wherein all changes in
+    // counters/progress/phase/output-size are reported through statusUpdate()
+    // call but not diagnosticInformation.
+    context.getEventHandler().handle(
+        new TaskAttemptEventDiagnosticsUpdate(mrv2AttemptId, trace));
+
+  }
+
+  @Override
+  public boolean ping(TezTaskAttemptID taskAttemptId) throws IOException {
+    LOG.info("Ping from " + taskAttemptId.toString());
+    taskHeartbeatHandler.pinged(TezTypeConverters.toYarn(taskAttemptId));
+    pingContainerHeartbeatHandler(taskAttemptId);
+    return true;
+  }
+
+  @Override
+  public void done(TezTaskAttemptID taskAttemptId) throws IOException {
+    LOG.info("Done acknowledgement from " + taskAttemptId.toString());
+    TaskAttemptId mrv2AttemptId = TezTypeConverters.toYarn(taskAttemptId);
+
+    taskHeartbeatHandler.progressing(mrv2AttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(mrv2AttemptId, TaskAttemptEventType.TA_DONE));
+
+  }
+
+  /**
+   * TaskAttempt is reporting that it is in commit_pending and it is waiting for
+   * the commit Response
+   * 
+   * <br/>
+   * Commit it a two-phased protocol. First the attempt informs the
+   * ApplicationMaster that it is
+   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+   * a legacy from the centralized commit protocol handling by the JobTracker.
+   */
+  @Override
+  public void commitPending(TezTaskAttemptID taskAttemptId, TezTaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    LOG.info("Commit-pending state update from " + taskAttemptId.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    TaskAttemptId mrv2AttemptId = TezTypeConverters.toYarn(taskAttemptId);
+    
+    
+
+    taskHeartbeatHandler.progressing(mrv2AttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+    //Ignorable TaskStatus? - since a task will send a LastStatusUpdate
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(mrv2AttemptId, 
+            TaskAttemptEventType.TA_COMMIT_PENDING));
+  }
+
+  /**
+   * Child checking whether it can commit.
+   * 
+   * <br/>
+   * Commit is a two-phased protocol. First the attempt informs the
+   * ApplicationMaster that it is
+   * {@link #commitPending(TaskAttemptID, TaskStatus)}. Then it repeatedly polls
+   * the ApplicationMaster whether it {@link #canCommit(TaskAttemptID)} This is
+   * a legacy from the centralized commit protocol handling by the JobTracker.
+   */
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
+    LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
+    // An attempt is asking if it can commit its output. This can be decided
+    // only by the task which is managing the multiple attempts. So redirect the
+    // request there.
+    TaskAttemptId mrv2AttemptId = TezTypeConverters.toYarn(taskAttemptId);
+
+    taskHeartbeatHandler.progressing(mrv2AttemptId);
+    pingContainerHeartbeatHandler(taskAttemptId);
+
+    Job job = context.getJob(mrv2AttemptId.getTaskId().getJobId());
+    Task task = job.getTask(mrv2AttemptId.getTaskId());
+    return task.canCommit(mrv2AttemptId);
+  }
+
+  @Override
+  public void shuffleError(TezTaskAttemptID taskId, String message)
+      throws IOException {
+    // TODO: This isn't really used in any MR code. Ask for removal.
+  }
+
+  @Override
+  public void fsError(TezTaskAttemptID taskAttemptId, String message)
+      throws IOException {
+    // This happens only in Child.
+    LOG.fatal("Task: " + taskAttemptId + " - failed due to FSError: " + message);
+    TaskAttemptId mrv2AttemptId = TezTypeConverters.toYarn(taskAttemptId);
+    reportDiagnosticInfo(taskAttemptId, "FSError: " + message);
+
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(mrv2AttemptId, TaskAttemptEventType.TA_FAILED));
+  }
+
+  @Override
+  public void fatalError(TezTaskAttemptID taskAttemptId, String message)
+      throws IOException {
+    // This happens only in Child and in the Task.
+    LOG.fatal("Task: " + taskAttemptId + " - exited : " + message);
+    reportDiagnosticInfo(taskAttemptId, "Error: " + message);
+
+    TaskAttemptId mrv2AttemptId = TezTypeConverters.toYarn(taskAttemptId);
+    context.getEventHandler().handle(
+        new TaskAttemptEvent(mrv2AttemptId, TaskAttemptEventType.TA_FAILED));
+  }
+
+  @Override
+  public void outputReady(TezTaskAttemptID taskAttemptId,
+      OutputContext outputContext) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("AttemptId: " + taskAttemptId + " reported output context: "
+          + outputContext);
+    }
+    TaskAttemptId mrv2AttemptId = TezTypeConverters.toYarn(taskAttemptId);
+    context.getEventHandler().handle(new TaskAttemptEventOutputConsumable(mrv2AttemptId, outputContext));
+  }
+
+  @Override
+  public ProceedToCompletionResponse
+      proceedToCompletion(TezTaskAttemptID taskAttemptId) throws IOException {
+    
+    // The async nature of the processing combined with the 1 second interval
+    // between polls (MRTask) implies tasks end up wasting upto 1 second doing
+    // nothing. Similarly for CA_COMMIT.
+    
+    TaskAttemptId mrv2AttemptId = TezTypeConverters.toYarn(taskAttemptId);
+    Job job = context.getJob(mrv2AttemptId.getTaskId().getJobId());
+    Task task = job.getTask(mrv2AttemptId.getTaskId());
+    
+    if (task.needsWaitAfterOutputConsumable()) {
+      TaskAttemptId outputReadyAttempt = task.getOutputConsumableAttempt();
+      if (outputReadyAttempt != null) {
+        if (!outputReadyAttempt.equals(mrv2AttemptId)) {
+          LOG.info("Telling taksAttemptId: "
+              + mrv2AttemptId
+              + " to die, since the outputReady atempt for this task is different: "
+              + outputReadyAttempt);
+          return new ProceedToCompletionResponse(true, true);
+        }
+      }
+      boolean reducesDone = true;
+      for (Task rTask : job.getTasks(TaskType.REDUCE).values()) {
+        if (rTask.getState() != TaskState.SUCCEEDED) {
+          // TODO EVENTUALLY - could let the map tasks exit after reduces are
+          // done with the shuffle phase, instead of waiting for the reduces to
+          // complete.
+          reducesDone = false;
+          break;
+        }
+      }
+      if (reducesDone) {
+        return new ProceedToCompletionResponse(false, true);
+      } else {
+        return new ProceedToCompletionResponse(false, false);
+      }
+    } else {
+      return COMPLETION_RESPONSE_NO_WAIT;
+    }
+  }
+  
+  
+  
+  // TODO EVENTUALLY remove all mrv2 ids.
+  @Override
+  public void unregisterTaskAttempt(TaskAttemptId attemptId) {
+    attemptToContainerIdMap.remove(IDConverter
+        .fromMRTaskAttemptId(TypeConverter.fromYarn(attemptId)));
+  }
+
+  public MRTaskContext pullTaskAttemptContext(ContainerId containerId) {
+    AMContainerImpl container = (AMContainerImpl) context.getAllContainers()
+        .get(containerId);
+    return container.pullTaskContext();
+  }
+
+  @Override
+  public void registerRunningContainer(ContainerId containerId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("ContainerId: " + containerId
+          + " registered with TaskAttemptListener");
+    }
+    registeredContainers.add(containerId);
+  }
+
+  @Override
+  public void registerTaskAttempt(TaskAttemptId attemptId,
+      ContainerId containerId) {
+    attemptToContainerIdMap.put(
+        IDConverter.fromMRTaskAttemptId(TypeConverter.fromYarn(attemptId)),
+        containerId);
+  }
+
+  @Override
+  public void unregisterRunningContainer(ContainerId containerId) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Unregistering Container from TaskAttemptListener: "
+          + containerId);
+    }
+    registeredContainers.remove(containerId);
+  }
+  
+  private void pingContainerHeartbeatHandler(ContainerId containerId) {
+    containerHeartbeatHandler.pinged(containerId);
+  }
+  
+  private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
+    ContainerId containerId = attemptToContainerIdMap.get(taskAttemptId);
+    if (containerId != null) {
+      containerHeartbeatHandler.pinged(containerId);
+    } else {
+      LOG.warn("Handling communication from attempt: " + taskAttemptId
+          + ", ContainerId not known for this attempt");
+    }
+  }
+}

Added: incubator/tez/tez-yarn-application/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo (added)
+++ incubator/tez/tez-yarn-application/src/main/resources/META-INF/services/org.apache.hadoop.security.SecurityInfo Fri Mar 15 21:26:36 2013
@@ -0,0 +1 @@
+org.apache.hadoop.mapreduce.v2.app2.MRClientSecurityInfo

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptListenerImpl.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,153 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.jobhistory.ContainerHeartbeatHandler;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.TaskHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app2.job.event.TaskAttemptEventStartedRemotely;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerImpl;
+import org.apache.hadoop.mapreduce.v2.app2.rm.container.AMContainerMap;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.ContainerTask;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.hadoop.TaskAttemptListenerImplTez;
+import org.apache.tez.mapreduce.task.impl.MRTaskContext;
+import org.apache.tez.records.TezTaskAttemptID;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestTaskAttemptListenerImpl {
+  public static class MockTaskAttemptListenerImpl extends TaskAttemptListenerImplTez {
+
+    public MockTaskAttemptListenerImpl(AppContext context,
+        JobTokenSecretManager jobTokenSecretManager,
+        ContainerHeartbeatHandler chh, TaskHeartbeatHandler thh) {
+      super(context, thh, chh, jobTokenSecretManager);
+    }
+
+    @Override
+    protected void startRpcServer() {
+      //Empty
+    }
+    
+    @Override
+    protected void stopRpcServer() {
+      //Empty
+    }
+  }
+  
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testGetTask() throws IOException {
+    AppContext appCtx = mock(AppContext.class);
+    EventHandler mockHandler = mock(EventHandler.class);
+    AMContainerMap amContainers = mock(AMContainerMap.class);
+    
+    // Request to get a task for Container1 returns null.
+    ContainerId containerId1 = BuilderUtils.newContainerId(1, 1, 1, 1);
+    AMContainerImpl amContainer1 = mock(AMContainerImpl.class);
+    when(amContainer1.pullTaskContext()).thenReturn(null);
+    when(amContainers.get(containerId1)).thenReturn(amContainer1);
+    
+    MRTaskContext task = mock(MRTaskContext.class);
+    TezTaskAttemptID taId = new TezTaskAttemptID("1", 1, MRTaskType.MAP.toString(), 1, 1);
+    when(task.getTaskAttemptId()).thenReturn(taId);
+    
+    
+    // Request to get a task for Container2 returns task.
+    ContainerId containerId2 = BuilderUtils.newContainerId(1, 1,1,2);
+    AMContainerImpl amContainer2 = mock(AMContainerImpl.class);
+    when(amContainer2.pullTaskContext()).thenReturn(task);
+    when(amContainers.get(containerId2)).thenReturn(amContainer2);
+    
+    when(appCtx.getAllContainers()).thenReturn(amContainers);
+    when(appCtx.getEventHandler()).thenReturn(mockHandler);
+    
+    JobTokenSecretManager secret = mock(JobTokenSecretManager.class); 
+    TaskHeartbeatHandler thh = mock(TaskHeartbeatHandler.class);
+    ContainerHeartbeatHandler chh = mock(ContainerHeartbeatHandler.class);
+    
+    MockTaskAttemptListenerImpl listener = 
+      new MockTaskAttemptListenerImpl(appCtx, secret, chh, thh);
+    Configuration conf = new Configuration();
+    listener.init(conf);
+    listener.start();
+
+    ContainerContext context1 = new ContainerContext(containerId1, "");
+    
+    ContainerContext context2 = new ContainerContext(containerId2, "");
+    ContainerTask result = null;
+    
+    // Verify ask before registration.
+    //The JVM ID has not been registered yet so we should kill it.
+    result = listener.getTask(context1);
+    assertNotNull(result);
+    assertTrue(result.shouldDie());
+    verify(chh, never()).pinged(any(ContainerId.class));
+
+    // Verify ask after JVM registration, but before container is assigned a task.
+    listener.registerRunningContainer(containerId1);
+    result = listener.getTask(context1);
+    assertNull(result);
+    verify(chh, times(1)).pinged(any(ContainerId.class));
+    
+    // Verify ask after JVM registration, and when the container has a task.
+    listener.registerRunningContainer(containerId2);
+    result = listener.getTask(context2);
+    assertNotNull(result);
+    assertFalse(result.shouldDie());
+    assertTrue(result.getMrTaskContext() == task);
+    verify(chh, times(2)).pinged(any(ContainerId.class));
+    ArgumentCaptor<Event> ac = ArgumentCaptor.forClass(Event.class);
+    verify(mockHandler).handle(ac.capture());
+    Event cEvent = ac.getValue();
+    assertTrue(cEvent.getClass().equals(TaskAttemptEventStartedRemotely.class));
+    TaskAttemptEventStartedRemotely event = (TaskAttemptEventStartedRemotely)cEvent; 
+    assertTrue(event.getType() == TaskAttemptEventType.TA_STARTED_REMOTELY);
+    assertTrue(event.getContainerId().equals(containerId2));
+    
+    // Verify ask after JVM is unregistered.
+    listener.unregisterRunningContainer(containerId1);
+    result = listener.getTask(context1);
+    assertNotNull(result);
+    assertTrue(result.shouldDie());
+
+    listener.stop();
+  }
+}

Added: incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1457129&view=auto
==============================================================================
--- incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (added)
+++ incubator/tez/tez-yarn-application/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Fri Mar 15 21:26:36 2013
@@ -0,0 +1,395 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.jobhistory;
+
+import static junit.framework.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.*;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Counters;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.app2.AppContext;
+import org.apache.hadoop.mapreduce.v2.app2.job.Job;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.YarnException;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.util.BuilderUtils;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestJobHistoryEventHandler {
+
+
+  private static final Log LOG = LogFactory
+      .getLog(TestJobHistoryEventHandler.class);
+
+  @Test
+  public void testFirstFlushOnCompletionEvent() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
+    conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+        60 * 1000l);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
+    conf.setInt(
+        MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 200);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter).write(any(HistoryEvent.class));
+
+      for (int i = 0; i < 100; i++) {
+        queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskStartedEvent(
+            t.taskID, 0, TaskType.MAP, "")));
+      }
+      handleNextNEvents(jheh, 100);
+      verify(mockWriter, times(0)).flush();
+
+      // First completion event, but min-queue-size for batching flushes is 10
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
+          t.taskID, null, 0, TaskType.MAP, "", null)));
+      verify(mockWriter).flush();
+
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+
+  @Test
+  public void testMaxUnflushedCompletionEvents() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
+    conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+        60 * 1000l);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
+    conf.setInt(
+        MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter).write(any(HistoryEvent.class));
+
+      for (int i = 0 ; i < 100 ; i++) {
+        queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
+            t.taskID, null, 0, TaskType.MAP, "", null)));
+      }
+
+      handleNextNEvents(jheh, 9);
+      verify(mockWriter, times(0)).flush();
+
+      handleNextNEvents(jheh, 1);
+      verify(mockWriter).flush();
+      
+      handleNextNEvents(jheh, 50);
+      verify(mockWriter, times(6)).flush();
+      
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+  
+  @Test
+  public void testUnflushedTimer() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
+    conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+        2 * 1000l); //2 seconds.
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 10);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 100);
+    conf.setInt(
+        MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 5);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter).write(any(HistoryEvent.class));
+
+      for (int i = 0 ; i < 100 ; i++) {
+        queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
+            t.taskID, null, 0, TaskType.MAP, "", null)));
+      }
+
+      handleNextNEvents(jheh, 9);
+      verify(mockWriter, times(0)).flush();
+
+      Thread.sleep(2 * 4 * 1000l); // 4 seconds should be enough. Just be safe.
+      verify(mockWriter).flush();
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+  
+  @Test
+  public void testBatchedFlushJobEndMultiplier() throws Exception {
+    TestParams t = new TestParams();
+    Configuration conf = new Configuration();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.workDir);
+    conf.setLong(MRJobConfig.MR_AM_HISTORY_COMPLETE_EVENT_FLUSH_TIMEOUT_MS,
+        60 * 1000l); //2 seconds.
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_JOB_COMPLETE_UNFLUSHED_MULTIPLIER, 3);
+    conf.setInt(MRJobConfig.MR_AM_HISTORY_MAX_UNFLUSHED_COMPLETE_EVENTS, 10);
+    conf.setInt(
+        MRJobConfig.MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD, 0);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    EventWriter mockWriter = null;
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+      mockWriter = jheh.getEventWriter();
+      verify(mockWriter).write(any(HistoryEvent.class));
+
+      for (int i = 0 ; i < 100 ; i++) {
+        queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent(
+            t.taskID, null, 0, TaskType.MAP, "", null)));
+      }
+      queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+          TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters())));
+
+      handleNextNEvents(jheh, 29);
+      verify(mockWriter, times(0)).flush();
+
+      handleNextNEvents(jheh, 72);
+      verify(mockWriter, times(4)).flush(); //3 * 30 + 1 for JobFinished
+    } finally {
+      jheh.stop();
+      verify(mockWriter).close();
+    }
+  }
+
+  private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
+    jheh.handle(event);
+  }
+
+  private void handleEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event)
+      throws InterruptedException {
+    jheh.handle(event);
+    jheh.handleEvent(jheh.eventQueue.take());
+  }
+
+  private void handleNextNEvents(JHEvenHandlerForTest jheh, int numEvents)
+      throws InterruptedException {
+    for (int i = 0; i < numEvents; i++) {
+      jheh.handleEvent(jheh.eventQueue.take());
+    }
+  }
+
+  private String setupTestWorkDir() {
+    File testWorkDir = new File("target", this.getClass().getCanonicalName());
+    try {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testWorkDir.getAbsolutePath()), true);
+      return testWorkDir.getAbsolutePath();
+    } catch (Exception e) {
+      LOG.warn("Could not cleanup", e);
+      throw new YarnException("could not cleanup test dir", e);
+    }
+  }
+
+  private AppContext mockAppContext(JobId jobId, ApplicationId appId) {
+    AppContext mockContext = mock(AppContext.class);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getTotalMaps()).thenReturn(10);
+    when(mockJob.getTotalReduces()).thenReturn(10);
+    when(mockJob.getName()).thenReturn("mockjob");
+    when(mockContext.getJob(jobId)).thenReturn(mockJob);
+    doReturn(appId).when(mockContext).getApplicationID();
+    return mockContext;
+  }
+  
+
+  private class TestParams {
+    String workDir = setupTestWorkDir();
+    ApplicationId appId = BuilderUtils.newApplicationId(200, 1);
+    ApplicationAttemptId appAttemptId =
+        BuilderUtils.newApplicationAttemptId(appId, 1);
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 1);
+    TaskID taskID = TaskID.forName("task_200707121733_0003_m_000005");
+    JobId jobId = MRBuilderUtils.newJobId(appId, 1);
+    AppContext mockAppContext = mockAppContext(jobId, appId);
+  }
+
+  private JobHistoryEvent getEventToEnqueue(JobId jobId) {
+    JobHistoryEvent toReturn = Mockito.mock(JobHistoryEvent.class);
+    HistoryEvent he = Mockito.mock(HistoryEvent.class);
+    Mockito.when(he.getEventType()).thenReturn(EventType.JOB_STATUS_CHANGED);
+    Mockito.when(toReturn.getHistoryEvent()).thenReturn(he);
+    Mockito.when(toReturn.getJobID()).thenReturn(jobId);
+    return toReturn;
+  }
+
+  @Test
+  /**
+   * Tests that in case of SIGTERM, the JHEH stops without processing its event
+   * queue (because we must stop quickly lest we get SIGKILLed) and processes
+   * a JobUnsuccessfulEvent for jobs which were still running (so that they may
+   * show up in the JobHistoryServer)
+   */
+  public void testSigTermedFunctionality() throws IOException {
+    AppContext mockedContext = Mockito.mock(AppContext.class);
+    JHEventHandlerForSigtermTest jheh =
+      new JHEventHandlerForSigtermTest(mockedContext, 0);
+
+    JobId jobId = Mockito.mock(JobId.class);
+    jheh.addToFileMap(jobId);
+
+    //Submit 4 events and check that they're handled in the absence of a signal
+    final int numEvents = 4;
+    JobHistoryEvent events[] = new JobHistoryEvent[numEvents];
+    for(int i=0; i < numEvents; ++i) {
+      events[i] = getEventToEnqueue(jobId);
+      jheh.handle(events[i]);
+    }
+    jheh.stop();
+    //Make sure events were handled
+    assertTrue("handleEvent should've been called only 4 times but was "
+      + jheh.eventsHandled, jheh.eventsHandled == 4);
+
+    //Create a new jheh because the last stop closed the eventWriter etc.
+    jheh = new JHEventHandlerForSigtermTest(mockedContext, 0);
+
+    // Make constructor of JobUnsuccessfulCompletionEvent pass
+    Job job = Mockito.mock(Job.class);
+    Mockito.when(mockedContext.getJob(jobId)).thenReturn(job);
+    // Make TypeConverter(JobID) pass
+    ApplicationId mockAppId = Mockito.mock(ApplicationId.class);
+    Mockito.when(mockAppId.getClusterTimestamp()).thenReturn(1000l);
+    Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
+
+    jheh.addToFileMap(jobId);
+    jheh.setSignalled(true);
+    for(int i=0; i < numEvents; ++i) {
+      events[i] = getEventToEnqueue(jobId);
+      jheh.handle(events[i]);
+    }
+    jheh.stop();
+    //Make sure events were handled, 4 + 1 finish event
+    assertTrue("handleEvent should've been called only 5 times but was "
+        + jheh.eventsHandled, jheh.eventsHandled == 5);
+    assertTrue("Last event handled wasn't JobUnsuccessfulCompletionEvent",
+        jheh.lastEventHandled.getHistoryEvent()
+        instanceof JobUnsuccessfulCompletionEvent);
+  }
+}
+
+class JHEvenHandlerForTest extends JobHistoryEventHandler2 {
+
+  private EventWriter eventWriter;
+  volatile int handleEventCompleteCalls = 0;
+  volatile int handleEventStartedCalls = 0;
+
+  public JHEvenHandlerForTest(AppContext context, int startCount) {
+    super(context, startCount);
+  }
+
+  @Override
+  public void start() {
+  }
+  
+  @Override
+  protected EventWriter createEventWriter(Path historyFilePath)
+      throws IOException {
+    this.eventWriter = mock(EventWriter.class);
+    return this.eventWriter;
+  }
+
+  @Override
+  protected void closeEventWriter(JobId jobId) {
+  }
+  
+  public EventWriter getEventWriter() {
+    return this.eventWriter;
+  }
+}
+
+/**
+ * Class to help with testSigTermedFunctionality
+ */
+class JHEventHandlerForSigtermTest extends JobHistoryEventHandler2 {
+  private MetaInfo metaInfo;
+  public JHEventHandlerForSigtermTest(AppContext context, int startCount) {
+    super(context, startCount);
+  }
+
+  public void addToFileMap(JobId jobId) {
+    metaInfo = Mockito.mock(MetaInfo.class);
+    Mockito.when(metaInfo.isWriterActive()).thenReturn(true);
+    fileMap.put(jobId, metaInfo);
+  }
+
+  JobHistoryEvent lastEventHandled;
+  int eventsHandled = 0;
+  @Override
+  protected void handleEvent(JobHistoryEvent event) {
+    this.lastEventHandled = event;
+    this.eventsHandled++;
+  }
+}



Mime
View raw message