tez-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hit...@apache.org
Subject tez git commit: TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. (hitesh)
Date Mon, 25 Apr 2016 18:05:45 GMT
Repository: tez
Updated Branches:
  refs/heads/branch-0.8 a425f5d91 -> 4694a9efd


TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts. (hitesh)

(cherry picked from commit 7e9ed83184e1fefb4ecf8b560a60d2521c0f30e0)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4694a9ef
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4694a9ef
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4694a9ef

Branch: refs/heads/branch-0.8
Commit: 4694a9efd98f779f696a3c34a354e3e7bb8dc7bf
Parents: a425f5d
Author: Hitesh Shah <hitesh@apache.org>
Authored: Mon Apr 25 11:04:41 2016 -0700
Committer: Hitesh Shah <hitesh@apache.org>
Committed: Mon Apr 25 11:05:41 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../org/apache/tez/common/ATSConstants.java     |  7 ++
 tez-dag/findbugs-exclude.xml                    |  2 +
 .../java/org/apache/tez/dag/app/AppContext.java |  6 ++
 .../org/apache/tez/dag/app/DAGAppMaster.java    | 21 +++++
 .../tez/dag/app/TaskCommunicatorManager.java    | 30 ++++++
 .../app/TaskCommunicatorManagerInterface.java   |  8 ++
 .../tez/dag/app/TaskCommunicatorWrapper.java    | 12 +++
 .../java/org/apache/tez/dag/app/dag/Vertex.java |  3 +
 .../tez/dag/app/dag/impl/ServicePluginInfo.java | 99 ++++++++++++++++++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 55 +++++++----
 .../apache/tez/dag/app/dag/impl/VertexImpl.java | 30 +++++-
 .../app/launcher/ContainerLauncherManager.java  |  4 +
 .../tez/dag/app/rm/TaskSchedulerManager.java    |  5 +
 .../dag/history/events/VertexFinishedEvent.java | 15 ++-
 .../history/events/VertexInitializedEvent.java  | 14 ++-
 .../impl/HistoryEventJsonConversion.java        |  8 ++
 .../apache/tez/dag/history/utils/DAGUtils.java  | 29 ++++++
 .../serviceplugins/api/TaskCommunicator.java    | 22 +++++
 .../apache/tez/dag/app/TestRecoveryParser.java  | 15 +--
 .../tez/dag/app/dag/impl/TestDAGImpl.java       |  5 +-
 .../tez/dag/app/dag/impl/TestDAGRecovery.java   | 12 +--
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 13 ++-
 .../tez/dag/app/dag/impl/TestVertexImpl.java    |  4 +
 .../TestHistoryEventsProtoConversion.java       |  6 +-
 .../impl/TestHistoryEventJsonConversion.java    |  4 +-
 .../ats/HistoryEventTimelineConversion.java     | 16 +++-
 .../ats/TestHistoryEventTimelineConversion.java | 59 +++++++++++-
 .../java/org/apache/tez/test/TestRecovery.java  | 32 +++----
 29 files changed, 468 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a0a5ce6..b6c5520 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,7 @@ Release 0.8.4: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-3219. Allow service plugins to define log locations link for remotely run task attempts.
   TEZ-3224. User payload is not initialized before creating vertex manager plugin.
   TEZ-3226. Tez UI 2: All DAGs UX improvements.
   TEZ-3077. TezClient.waitTillReady should support timeout.

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 0b8c67d..c56582c 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -90,6 +90,13 @@ public class ATSConstants {
   public static final String LAST_DATA_EVENTS = "lastDataEvents";
   public static final String UPDATED_EDGE_MANAGERS = "updatedEdgeManagers";
   public static final String CREATION_CAUSAL_ATTEMPT = "creationCausalAttempt";
+  public static final String TASK_COMMUNICATOR_NAME = "taskCommunicatorName";
+  public static final String TASK_SCHEDULER_NAME = "taskSchedulerName";
+  public static final String CONTAINER_LAUNCHER_NAME = "containerLauncherName";
+  public static final String TASK_COMMUNICATOR_CLASS_NAME = "taskCommunicatorClassName";
+  public static final String TASK_SCHEDULER_CLASS_NAME = "taskSchedulerClassName";
+  public static final String CONTAINER_LAUNCHER_CLASS_NAME = "containerLauncherClassName";
+  public static final String SERVICE_PLUGIN = "servicePlugin";
 
   /* Counters-related keys */
   public static final String COUNTER_GROUPS = "counterGroups";

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index f106c8b..0f3cdca 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -163,6 +163,8 @@
       <Field name="recoveryEnabled"/>
       <Field name="isLocal"/>
       <Field name="hadoopShim"/>
+      <Field name="containerLauncherManager"/>
+      <Field name="taskCommunicatorManager"/>
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC"/>
   </Match>

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
index 30716da..45ce8c1 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/AppContext.java
@@ -94,6 +94,8 @@ public interface AppContext {
 
   TaskSchedulerManager getTaskScheduler();
 
+  TaskCommunicatorManagerInterface getTaskCommunicatorManager();
+
   boolean isSession();
 
   boolean isLocal();
@@ -127,6 +129,10 @@ public interface AppContext {
   public String getTaskSchedulerName(int schedulerId);
   public String getContainerLauncherName(int launcherId);
 
+  public String getTaskCommunicatorClassName(int taskCommId);
+  public String getTaskSchedulerClassName(int schedulerId);
+  public String getContainerLauncherClassName(int launcherId);
+
   public HadoopShim getHadoopShim();
 
   public DAGRecoveryData getDAGRecoveryData();

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
index 7d28497..7ca7118 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/DAGAppMaster.java
@@ -1589,6 +1589,11 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public TaskCommunicatorManagerInterface getTaskCommunicatorManager() {
+      return taskCommunicatorManager;
+    }
+
+    @Override
     public boolean isSession() {
       return isSession;
     }
@@ -1680,6 +1685,22 @@ public class DAGAppMaster extends AbstractService {
     }
 
     @Override
+    public String getTaskCommunicatorClassName(int taskCommId) {
+      return taskCommunicatorManager.getTaskCommunicatorClassName(taskCommId);
+    }
+
+    @Override
+    public String getTaskSchedulerClassName(int schedulerId) {
+      return taskSchedulerManager.getTaskSchedulerClassName(schedulerId);
+    }
+
+    @Override
+    public String getContainerLauncherClassName(int launcherId) {
+      return containerLauncherManager.getContainerLauncherClassName(launcherId);
+    }
+
+
+    @Override
     public HadoopShim getHadoopShim() {
       return hadoopShim;
     }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
index cfb177b..c9d1f2e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManager.java
@@ -650,4 +650,34 @@ public class TaskCommunicatorManager extends AbstractService implements
   private void sendEvent(Event<?> event) {
     context.getEventHandler().handle(event);
   }
+
+  @Override
+  public String getTaskCommunicatorClassName(int taskCommId) {
+    return taskCommunicators[taskCommId].getTaskCommunicator().getClass().getName();
+  }
+
+  @Override
+  public String getInProgressLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId) {
+    try {
+      return taskCommunicators[taskCommId].getInProgressLogsUrl(attemptID, containerNodeId);
+    } catch (Exception e) {
+      LOG.warn("Failed to retrieve InProgressLogsUrl from TaskCommunicator,"
+          + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+          + ", attemptId=" + attemptID, e);
+    }
+    return null;
+  }
+
+  @Override
+  public String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId) {
+    try {
+      return taskCommunicators[taskCommId].getCompletedLogsUrl(attemptID, containerNodeId);
+    } catch (Exception e) {
+      LOG.warn("Failed to retrieve CompletedLogsUrl from TaskCommunicator,"
+          + ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+          + ", attemptId=" + attemptID, e);
+    }
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
index e0f9852..254e74c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorManagerInterface.java
@@ -19,6 +19,7 @@
 package org.apache.tez.dag.app;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.serviceplugins.api.ContainerEndReason;
 import org.apache.tez.serviceplugins.api.DagInfo;
 import org.apache.tez.serviceplugins.api.ServicePluginError;
@@ -46,4 +47,11 @@ public interface TaskCommunicatorManagerInterface {
   TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex);
 
   void reportError(int taskCommIndex, ServicePluginError servicePluginError, String diagnostics, DagInfo dagName);
+
+  String getTaskCommunicatorClassName(int taskCommId);
+
+  String getInProgressLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId);
+
+  String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId);
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
index 4a75875..4afb427 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskCommunicatorWrapper.java
@@ -21,6 +21,7 @@ import java.util.Map;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.serviceplugins.api.TaskCommunicator;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -80,4 +81,15 @@ public class TaskCommunicatorWrapper {
   public TaskCommunicator getTaskCommunicator() {
     return real;
   }
+
+  public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId)
+      throws Exception {
+    return real.getInProgressLogsUrl(attemptID, containerNodeId);
+  }
+
+  public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId)
+      throws Exception {
+    return real.getCompletedLogsUrl(attemptID, containerNodeId);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index 25fbf3a..1b3b39c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -49,6 +49,7 @@ import org.apache.tez.dag.app.TaskAttemptEventInfo;
 import org.apache.tez.dag.app.dag.event.SpeculatorEvent;
 import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
 import org.apache.tez.dag.app.dag.impl.Edge;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.dag.records.TezTaskID;
 import org.apache.tez.dag.records.TezVertexID;
@@ -190,6 +191,8 @@ public interface Vertex extends Comparable<Vertex> {
   public int getContainerLauncherIdentifier();
   public int getTaskCommunicatorIdentifier();
 
+  public ServicePluginInfo getServicePluginInfo();
+
   public long getInitTime();
   public long getStartTime();
   public long getFinishTime();

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java
new file mode 100644
index 0000000..2ff1ee5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/ServicePluginInfo.java
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.app.dag.impl;
+
+
+public class ServicePluginInfo {
+
+  private String containerLauncherName;
+  private String taskSchedulerName;
+  private String taskCommunicatorName;
+  private String containerLauncherClassName;
+  private String taskSchedulerClassName;
+  private String taskCommunicatorClassName;
+
+  public ServicePluginInfo() {
+  }
+
+  public String getContainerLauncherName() {
+    return containerLauncherName;
+  }
+
+  public ServicePluginInfo setContainerLauncherName(String containerLauncherName) {
+    this.containerLauncherName = containerLauncherName;
+    return this;
+  }
+
+  public String getTaskSchedulerName() {
+    return taskSchedulerName;
+  }
+
+  public ServicePluginInfo setTaskSchedulerName(String taskSchedulerName) {
+    this.taskSchedulerName = taskSchedulerName;
+    return this;
+  }
+
+  public String getTaskCommunicatorName() {
+    return taskCommunicatorName;
+  }
+
+  public ServicePluginInfo setTaskCommunicatorName(String taskCommunicatorName) {
+    this.taskCommunicatorName = taskCommunicatorName;
+    return this;
+  }
+
+  public String getContainerLauncherClassName() {
+    return containerLauncherClassName;
+  }
+
+  public ServicePluginInfo setContainerLauncherClassName(String containerLauncherClassName) {
+    this.containerLauncherClassName = containerLauncherClassName;
+    return this;
+  }
+
+  public String getTaskSchedulerClassName() {
+    return taskSchedulerClassName;
+  }
+
+  public ServicePluginInfo setTaskSchedulerClassName(String taskSchedulerClassName) {
+    this.taskSchedulerClassName = taskSchedulerClassName;
+    return this;
+  }
+
+  public String getTaskCommunicatorClassName() {
+    return taskCommunicatorClassName;
+  }
+
+  public ServicePluginInfo setTaskCommunicatorClassName(String taskCommunicatorClassName) {
+    this.taskCommunicatorClassName = taskCommunicatorClassName;
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    return "ServicePluginInfo {" +
+        "containerLauncherName=" + containerLauncherName +
+        ", taskSchedulerName=" + taskSchedulerName +
+        ", taskCommunicatorName=" + taskCommunicatorName +
+        ", containerLauncherClassName=" + containerLauncherClassName +
+        ", taskSchedulerClassName=" + taskSchedulerClassName +
+        ", taskCommunicatorClassName=" + taskCommunicatorClassName +
+        " }";
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 6169a7b..6d9247e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
@@ -1107,31 +1108,49 @@ public class TaskAttemptImpl implements TaskAttempt,
 
   private String getInProgressLogsUrl() {
     String inProgressLogsUrl = null;
-    if (containerId != null && nodeHttpAddress != null) {
-      final String containerIdStr = containerId.toString();
-      inProgressLogsUrl = nodeHttpAddress
-          + "/" + "node/containerlogs"
-          + "/" + containerIdStr
-          + "/" + this.appContext.getUser();
+    if (getVertex().getServicePluginInfo().getContainerLauncherName().equals(
+          TezConstants.getTezYarnServicePluginName())
+        || getVertex().getServicePluginInfo().getContainerLauncherName().equals(
+          TezConstants.getTezUberServicePluginName())) {
+      if (containerId != null && nodeHttpAddress != null) {
+        final String containerIdStr = containerId.toString();
+        inProgressLogsUrl = nodeHttpAddress
+            + "/" + "node/containerlogs"
+            + "/" + containerIdStr
+            + "/" + this.appContext.getUser();
+      }
+    } else {
+      inProgressLogsUrl = appContext.getTaskCommunicatorManager().getInProgressLogsUrl(
+          getVertex().getTaskCommunicatorIdentifier(),
+          attemptId, containerNodeId);
     }
     return inProgressLogsUrl;
   }
 
   private String getCompletedLogsUrl() {
     String completedLogsUrl = null;
-    if (containerId != null && containerNodeId != null && nodeHttpAddress != null) {
-      final String containerIdStr = containerId.toString();
-      if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
-          YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
-          && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
-        String contextStr = "v_" + getVertex().getName()
-            + "_" + this.attemptId.toString();
-        completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
-            + "/" + containerNodeId.toString()
-            + "/" + containerIdStr
-            + "/" + contextStr
-            + "/" + this.appContext.getUser();
+    if (getVertex().getServicePluginInfo().getContainerLauncherName().equals(
+          TezConstants.getTezYarnServicePluginName())
+        || getVertex().getServicePluginInfo().getContainerLauncherName().equals(
+          TezConstants.getTezUberServicePluginName())) {
+      if (containerId != null && containerNodeId != null && nodeHttpAddress != null) {
+        final String containerIdStr = containerId.toString();
+        if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
+            && conf.get(YarnConfiguration.YARN_LOG_SERVER_URL) != null) {
+          String contextStr = "v_" + getVertex().getName()
+              + "_" + this.attemptId.toString();
+          completedLogsUrl = conf.get(YarnConfiguration.YARN_LOG_SERVER_URL)
+              + "/" + containerNodeId.toString()
+              + "/" + containerIdStr
+              + "/" + contextStr
+              + "/" + this.appContext.getUser();
+        }
       }
+    } else {
+      completedLogsUrl = appContext.getTaskCommunicatorManager().getCompletedLogsUrl(
+          getVertex().getTaskCommunicatorIdentifier(),
+          attemptId, containerNodeId);
     }
     return completedLogsUrl;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index ea202f7..b22af1a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -247,6 +247,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   @VisibleForTesting
   final int taskCommunicatorIdentifier;
 
+  final ServicePluginInfo servicePluginInfo;
+
+
   //fields initialized in init
 
   @VisibleForTesting
@@ -1009,6 +1012,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
       LOG.error("Failed to get index for containerLauncher: " + containerLauncherName);
       throw e;
     }
+    this.servicePluginInfo = new ServicePluginInfo()
+        .setContainerLauncherName(
+            appContext.getContainerLauncherName(this.containerLauncherIdentifier))
+        .setTaskSchedulerName(appContext.getTaskSchedulerName(this.taskSchedulerIdentifier))
+        .setTaskCommunicatorName(appContext.getTaskCommunicatorName(this.taskCommunicatorIdentifier))
+        .setContainerLauncherClassName(
+            appContext.getContainerLauncherClassName(this.containerLauncherIdentifier))
+        .setTaskSchedulerClassName(
+            appContext.getTaskSchedulerClassName(this.taskSchedulerIdentifier))
+        .setTaskCommunicatorClassName(
+            appContext.getTaskCommunicatorClassName(this.taskCommunicatorIdentifier));
 
     StringBuilder sb = new StringBuilder();
     sb.append("Running vertex: ").append(logIdentifier).append(" : ")
@@ -1043,6 +1057,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   }
 
   @Override
+  public ServicePluginInfo getServicePluginInfo() {
+    return servicePluginInfo;
+  }
+
+  @Override
   public boolean isSpeculationEnabled() {
     return isSpeculationEnabled;
   }
@@ -1923,8 +1942,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   void logJobHistoryVertexInitializedEvent() {
     if (recoveryData == null || !recoveryData.shouldSkipInit()) {
       VertexInitializedEvent initEvt = new VertexInitializedEvent(vertexId, vertexName,
-              initTimeRequested, initedTime, numTasks,
-              getProcessorName(), getAdditionalInputs(), initGeneratedEvents);
+          initTimeRequested, initedTime, numTasks,
+          getProcessorName(), getAdditionalInputs(), initGeneratedEvents,
+          servicePluginInfo);
       this.appContext.getHistoryHandler().handle(
               new DAGHistoryEvent(getDAGId(), initEvt));
     }
@@ -1989,9 +2009,9 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     taskStats.put(ATSConstants.NUM_FAILED_TASKS_ATTEMPTS, failedTaskAttemptCount.get());
     taskStats.put(ATSConstants.NUM_KILLED_TASKS_ATTEMPTS, killedTaskAttemptCount.get());
 
-    VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks, initTimeRequested,
-        initedTime, startTimeRequested, startedTime, finishTime, finalState, diagnostics,
-        counters, getVertexStats(), taskStats);
+    VertexFinishedEvent finishEvt = new VertexFinishedEvent(vertexId, vertexName, numTasks,
+        initTimeRequested, initedTime, startTimeRequested, startedTime, finishTime, finalState,
+        diagnostics, counters, getVertexStats(), taskStats, servicePluginInfo);
     this.appContext.getHistoryHandler().handleCriticalEvent(
         new DAGHistoryEvent(getDAGId(), finishEvt));
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
index 1f5151b..f2c1cff 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/ContainerLauncherManager.java
@@ -279,4 +279,8 @@ public class ContainerLauncherManager extends AbstractService
   private void sendEvent(Event<?> event) {
     appContext.getEventHandler().handle(event);
   }
+
+  public String getContainerLauncherClassName(int containerLauncherIndex) {
+    return containerLaunchers[containerLauncherIndex].getContainerLauncher().getClass().getName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
index 16f9a28..e68c9b8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerManager.java
@@ -996,4 +996,9 @@ public class TaskSchedulerManager extends AbstractService implements
 
     return historyUrl;
   }
+
+  public String getTaskSchedulerClassName(int taskSchedulerIndex) {
+    return taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
index f914165..a2cdae2 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexFinishedEvent.java
@@ -23,6 +23,7 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Map;
 
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.tez.common.counters.TezCounters;
@@ -55,12 +56,14 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   private boolean fromSummary = false;
   private VertexStats vertexStats;
   private Map<String, Integer> vertexTaskStats;
+  private ServicePluginInfo servicePluginInfo;
 
   public VertexFinishedEvent(TezVertexID vertexId, String vertexName, int numTasks, long initRequestedTime,
                              long initedTime, long startRequestedTime, long startedTime,
                              long finishTime, VertexState state, String diagnostics,
                              TezCounters counters, VertexStats vertexStats,
-                             Map<String, Integer> vertexTaskStats) {
+                             Map<String, Integer> vertexTaskStats,
+                             ServicePluginInfo servicePluginInfo) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.numTasks = numTasks;
@@ -74,6 +77,7 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
     this.tezCounters = counters;
     this.vertexStats = vertexStats;
     this.vertexTaskStats = vertexTaskStats;
+    this.servicePluginInfo = servicePluginInfo;
   }
 
   public VertexFinishedEvent() {
@@ -147,7 +151,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
         + ", counters=" + ( tezCounters == null ? "null" :
           tezCounters.toString().replaceAll("\\n", ", ").replaceAll("\\s+", " "))
         + ", vertexStats=" + (vertexStats == null ? "null" : vertexStats.toString())
-        + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString());
+        + ", vertexTaskStats=" + (vertexTaskStats == null ? "null" : vertexTaskStats.toString())
+        + ", servicePluginInfo="
+        + (servicePluginInfo != null ? servicePluginInfo : "null");
   }
 
   public TezVertexID getVertexID() {
@@ -227,4 +233,9 @@ public class VertexFinishedEvent implements HistoryEvent, SummaryEvent {
   public boolean isFromSummary() {
     return fromSummary;
   }
+
+  public ServicePluginInfo getServicePluginInfo() {
+    return servicePluginInfo;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
index 052908b..90099fc 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/VertexInitializedEvent.java
@@ -30,6 +30,7 @@ import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.InputInitializerDescriptor;
 import org.apache.tez.dag.api.RootInputLeafOutput;
 import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
 import org.apache.tez.dag.history.HistoryEvent;
 import org.apache.tez.dag.history.HistoryEventType;
 import org.apache.tez.dag.history.utils.TezEventUtils;
@@ -51,6 +52,7 @@ public class VertexInitializedEvent implements HistoryEvent {
   private String processorName;
   private Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs;
   private List<TezEvent> initGeneratedEvents;
+  private ServicePluginInfo servicePluginInfo;
 
   public VertexInitializedEvent() {
   }
@@ -59,7 +61,7 @@ public class VertexInitializedEvent implements HistoryEvent {
       String vertexName, long initRequestedTime, long initedTime,
       int numTasks, String processorName,
       Map<String, RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> additionalInputs,
-      List<TezEvent> initGeneratedEvents) {
+      List<TezEvent> initGeneratedEvents, ServicePluginInfo servicePluginInfo) {
     this.vertexName = vertexName;
     this.vertexID = vertexId;
     this.initRequestedTime = initRequestedTime;
@@ -68,6 +70,7 @@ public class VertexInitializedEvent implements HistoryEvent {
     this.processorName = processorName;
     this.additionalInputs = additionalInputs;
     this.initGeneratedEvents = initGeneratedEvents;
+    this.servicePluginInfo = servicePluginInfo;
   }
 
   @Override
@@ -173,7 +176,9 @@ public class VertexInitializedEvent implements HistoryEvent {
         + ", additionalInputsCount="
         + (additionalInputs != null ? additionalInputs.size() : 0)
         + ", initGeneratedEventsCount="
-        + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0);
+        + (initGeneratedEvents != null ? initGeneratedEvents.size() : 0)
+        + ", servicePluginInfo="
+        + (servicePluginInfo != null ? servicePluginInfo : "null");
   }
 
   public TezVertexID getVertexID() {
@@ -208,4 +213,9 @@ public class VertexInitializedEvent implements HistoryEvent {
   public List<TezEvent> getInitGeneratedEvents() {
     return initGeneratedEvents;
   }
+
+  public ServicePluginInfo getServicePluginInfo() {
+    return servicePluginInfo;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 75116c8..a767fbf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -716,6 +716,10 @@ public class HistoryEventJsonConversion {
         otherInfo.put(entry.getKey(), entry.getValue().intValue());
       }
     }
+    if (event.getServicePluginInfo() != null) {
+      otherInfo.put(ATSConstants.SERVICE_PLUGIN,
+          DAGUtils.convertServicePluginToJSON(event.getServicePluginInfo()));
+    }
 
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
@@ -787,6 +791,10 @@ public class HistoryEventJsonConversion {
     otherInfo.put(ATSConstants.INIT_TIME, event.getInitedTime());
     otherInfo.put(ATSConstants.NUM_TASKS, event.getNumTasks());
     otherInfo.put(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+    if (event.getServicePluginInfo() != null) {
+      otherInfo.put(ATSConstants.SERVICE_PLUGIN,
+          DAGUtils.convertServicePluginToJSON(event.getServicePluginInfo()));
+    }
     jsonObject.put(ATSConstants.OTHER_INFO, otherInfo);
 
     return jsonObject;

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
index 781120c..f1ac0ab 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/utils/DAGUtils.java
@@ -42,6 +42,7 @@ import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.records.DAGProtos;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.api.records.DAGProtos.PlanGroupInputEdgeInfo;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.history.logging.EntityTypes;
@@ -405,6 +406,34 @@ public class DAGUtils {
     return vertexStatsMap;
   }
 
+  public static JSONObject convertServicePluginToJSON(
+      ServicePluginInfo servicePluginInfo) {
+    JSONObject jsonObject = new JSONObject(convertServicePluginToATSMap(servicePluginInfo));
+    return jsonObject;
+  }
+
+  public static Map<String,Object> convertServicePluginToATSMap(
+      ServicePluginInfo servicePluginInfo) {
+    Map<String, Object> servicePluginMap = new LinkedHashMap<String, Object>();
+    if (servicePluginInfo == null) {
+      return servicePluginMap;
+    }
+    servicePluginMap.put(ATSConstants.TASK_SCHEDULER_NAME,
+        servicePluginInfo.getTaskSchedulerName());
+    servicePluginMap.put(ATSConstants.TASK_SCHEDULER_CLASS_NAME,
+        servicePluginInfo.getTaskSchedulerClassName());
+    servicePluginMap.put(ATSConstants.TASK_COMMUNICATOR_NAME,
+        servicePluginInfo.getTaskCommunicatorName());
+    servicePluginMap.put(ATSConstants.TASK_COMMUNICATOR_CLASS_NAME,
+        servicePluginInfo.getTaskCommunicatorClassName());
+    servicePluginMap.put(ATSConstants.CONTAINER_LAUNCHER_NAME,
+        servicePluginInfo.getContainerLauncherName());
+    servicePluginMap.put(ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME,
+        servicePluginInfo.getContainerLauncherClassName());
+    return servicePluginMap;
+  }
+
+
   public static Map<String,Object> convertEdgeProperty(
       EdgeProperty edge) {
     Map<String, Object> jsonDescriptor = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
index 8f919d1..34debd4 100644
--- a/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/serviceplugins/api/TaskCommunicator.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.tez.common.ServicePluginLifecycle;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -229,4 +230,25 @@ public abstract class TaskCommunicator implements ServicePluginLifecycle {
    *                               This will cause the app to shutdown.
    */
   public abstract Object getMetaInfo() throws ServicePluginException;
+
+  /**
+   * Return a URL that can be used as a link to the logs for a running attempt.
+   * @param attemptID Attempt ID for which the log link should be provided
+   * @param containerNodeId Node Id on which the attempt is meant to have run
+   * @return URL to logs for the attempt
+   */
+  public String getInProgressLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
+    return null;
+  }
+
+  /**
+   * Return a URL that can be used as a link to the logs for a completed attempt.
+   * @param attemptID Attempt ID for which the log link should be provided
+   * @param containerNodeId Node Id on which the attempt is meant to have run
+   * @return URL to logs for the attempt
+   */
+  public String getCompletedLogsUrl(TezTaskAttemptID attemptID, NodeId containerNodeId) {
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
index 962b230..f4edf9e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestRecoveryParser.java
@@ -453,7 +453,7 @@ public class TestRecoveryParser {
     rService.handle(new DAGHistoryEvent(dagID,
         new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L, 
             0L, 0L, 0L, VertexState.SUCCEEDED, 
-            "", null, null, null)));
+            "", null, null, null, null)));
     rService.stop();
 
     DAGRecoveryData dagData = parser.parseRecoveryData();
@@ -531,11 +531,11 @@ public class TestRecoveryParser {
     rService.handle(new DAGHistoryEvent(dagID,
         new VertexFinishedEvent(v0, "v1", 10, 0L, 0L, 
             0L, 0L, 0L, VertexState.SUCCEEDED, 
-            "", null, null, null)));
+            "", null, null, null, null)));
     rService.handle(new DAGHistoryEvent(dagID,
         new VertexFinishedEvent(v1, "v1", 10, 0L, 0L, 
             0L, 0L, 0L, VertexState.SUCCEEDED, 
-            "", null, null, null)));
+            "", null, null, null, null)));
     rService.stop();
     
     DAGRecoveryData dagData = parser.parseRecoveryData();
@@ -573,7 +573,7 @@ public class TestRecoveryParser {
     rService.handle(new DAGHistoryEvent(dagID,
         new VertexFinishedEvent(vertexId, "v1", 10, 0L, 0L, 
             0L, 0L, 0L, VertexState.SUCCEEDED, 
-            "", null, null, null)));
+            "", null, null, null, null)));
     rService.stop();
 
     DAGRecoveryData dagData = parser.parseRecoveryData();
@@ -652,18 +652,19 @@ public class TestRecoveryParser {
     TezVertexID v1Id = TezVertexID.getInstance(dagID, 1);
     TezVertexID v2Id = TezVertexID.getInstance(dagID, 2);
     // v0 VertexInitializedEvent
-    VertexInitializedEvent v0InitedEvent =  new VertexInitializedEvent(v0Id, "v0", 200L, 400L, 2, null, null, null);
+    VertexInitializedEvent v0InitedEvent =  new VertexInitializedEvent(
+        v0Id, "v0", 200L, 400L, 2, null, null, null, null);
     rService.handle(new DAGHistoryEvent(dagID, v0InitedEvent));
     // v1 VertexFinishedEvent(KILLED)
     VertexFinishedEvent v1FinishedEvent = new VertexFinishedEvent(v1Id, "v1", 2, 300L, 400L, 
         500L, 600L, 700L, VertexState.KILLED, 
-        "", null, null, null);
+        "", null, null, null, null);
     rService.handle(new DAGHistoryEvent(dagID, v1FinishedEvent));
     // v2 VertexInitializedEvent -> VertexStartedEvent
     List<TezEvent> initGeneratedEvents = Lists.newArrayList(
         new TezEvent(DataMovementEvent.create(ByteBuffer.wrap(new byte[0])), null));
     VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id, "v2", 200L, 300L,
-        2, null, null, initGeneratedEvents);
+        2, null, null, initGeneratedEvents, null);
     VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, 0L);
     rService.handle(new DAGHistoryEvent(dagID, v2InitedEvent));
     rService.handle(new DAGHistoryEvent(dagID, v2StartedEvent));

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
index 480e3cf..4471278 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGImpl.java
@@ -19,9 +19,9 @@
 package org.apache.tez.dag.app.dag.impl;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -40,6 +40,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import org.apache.commons.lang.StringUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
 import org.apache.tez.hadoop.shim.HadoopShim;
@@ -986,6 +987,8 @@ public class TestDAGImpl {
     doReturn(clusterInfo).when(dagWithCustomEdgeAppContext).getClusterInfo();
     dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDisptacher2());
     dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventHandler());
+    when(dagWithCustomEdgeAppContext.getContainerLauncherName(anyInt())).thenReturn(
+        TezConstants.getTezYarnServicePluginName());
   }
 
   private void initDAG(DAGImpl impl) {

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
index be1821b..3c284ec 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestDAGRecovery.java
@@ -700,7 +700,7 @@ public class TestDAGRecovery {
     List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
     VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
         "vertex1", 0L, v1InitedTime, 
-        v1NumTask, "", null, inputGeneratedTezEvents);
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
     VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
         null, null, null, null, false);
     doReturn(vertexRecoveryData).when(dagRecoveryData).getVertexRecoveryData(v1Id);
@@ -732,7 +732,7 @@ public class TestDAGRecovery {
     List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
     VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
         "vertex1", 0L, v1InitedTime, 
-        v1NumTask, "", null, inputGeneratedTezEvents);
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
     VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
         0L, v1NumTask, null, null, null, true);
     VertexRecoveryData vertexRecoveryData = new VertexRecoveryData(v1InitedEvent,
@@ -772,7 +772,7 @@ public class TestDAGRecovery {
     List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
     VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
         "vertex1", 0L, v1InitedTime, 
-        v1NumTask, "", null, inputGeneratedTezEvents);
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
     VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
         0L, v1NumTask, null, null, null, true);
     VertexStartedEvent v1StartedEvent = new VertexStartedEvent(v1Id, 0L, v1StartedTime);
@@ -803,7 +803,7 @@ public class TestDAGRecovery {
     List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
     VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
         "vertex1", 0L, v1InitedTime, 
-        v1NumTask, "", null, inputGeneratedTezEvents);
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
     Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
     VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
         0L, v1NumTask, null, null, rootInputSpecs, true);
@@ -911,7 +911,7 @@ public class TestDAGRecovery {
     List<TezEvent> inputGeneratedTezEvents = new ArrayList<TezEvent>();
     VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(v1Id, 
         "vertex1", 0L, v1InitedTime, 
-        v1NumTask, "", null, inputGeneratedTezEvents);
+        v1NumTask, "", null, inputGeneratedTezEvents, null);
     Map<String, InputSpecUpdate> rootInputSpecs = new HashMap<String, InputSpecUpdate>();
     VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v1Id, 
         0L, v1NumTask, null, null, rootInputSpecs, true);
@@ -1079,7 +1079,7 @@ public class TestDAGRecovery {
 
     VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(v2Id, 
         "vertex2", 0L, v1InitedTime, 
-        v1NumTask, "", null, null);
+        v1NumTask, "", null, null, null);
     VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new VertexConfigurationDoneEvent(v2Id, 
         0L, v1NumTask, null, null, null, false);
     VertexStartedEvent v2StartedEvent = new VertexStartedEvent(v2Id, 0L, v1StartedTime);

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 68cfc39..e4cd956 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.app.dag.impl;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.tez.common.MockDNSToSwitchMapping;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
 import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
@@ -139,6 +141,9 @@ public class TestTaskAttempt {
   AppContext appCtx;
   Task mockTask;
   TaskLocationHint locationHint;
+  Vertex mockVertex;
+  ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
+      .setContainerLauncherName(TezConstants.getTezYarnServicePluginName());
 
   @BeforeClass
   public static void setup() {
@@ -148,7 +153,14 @@ public class TestTaskAttempt {
   @Before
   public void setupTest() {
     appCtx = mock(AppContext.class);
+    when(appCtx.getContainerLauncherName(anyInt())).thenReturn(
+        TezConstants.getTezYarnServicePluginName());
+
     mockTask = mock(Task.class);
+    mockVertex = mock(Vertex.class);
+    when(mockTask.getVertex()).thenReturn(mockVertex);
+    when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
+
     HistoryEventHandler mockHistHandler = mock(HistoryEventHandler.class);
     doReturn(mockHistHandler).when(appCtx).getHistoryHandler();
     LogManager.getRootLogger().setLevel(Level.DEBUG);
@@ -1737,7 +1749,6 @@ public class TestTaskAttempt {
     }
 
     
-    Vertex mockVertex = mock(Vertex.class);
     boolean inputFailedReported = false;
     
     @Override

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index d2d9a07..6b30a24 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.counters.Limits;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConstants;
 import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
 import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
 import org.apache.tez.hadoop.shim.DefaultHadoopShim;
@@ -2354,6 +2355,9 @@ public class TestVertexImpl {
     dispatcher = new DrainDispatcher();
     appContext = mock(AppContext.class);
     when(appContext.getHadoopShim()).thenReturn(new DefaultHadoopShim());
+    when(appContext.getContainerLauncherName(anyInt())).thenReturn(
+        TezConstants.getTezYarnServicePluginName());
+
     thh = mock(TaskHeartbeatHandler.class);
     historyEventHandler = mock(HistoryEventHandler.class);
     TaskSchedulerManager taskScheduler = mock(TaskSchedulerManager.class);

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 5e7e906..7d3faf9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -276,7 +276,7 @@ public class TestHistoryEventsProtoConversion {
     VertexInitializedEvent event = new VertexInitializedEvent(
         TezVertexID.getInstance(
             TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
-        "vertex1", 1000l, 15000l, 100, "procName", null, initGeneratedEvents);
+        "vertex1", 1000l, 15000l, 100, "procName", null, initGeneratedEvents, null);
     VertexInitializedEvent deserializedEvent = (VertexInitializedEvent)
         testProtoConversion(event);
     Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -386,7 +386,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
               "vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
-              null, null, null, null);
+              null, null, null, null, null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());
@@ -401,7 +401,7 @@ public class TestHistoryEventsProtoConversion {
           new VertexFinishedEvent(TezVertexID.getInstance(
               TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111),
               "vertex1", 1, 1000l, 15000l, 16000l, 20000l, 1344400l, VertexState.ERROR,
-              "diagnose", new TezCounters(), new VertexStats(), null);
+              "diagnose", new TezCounters(), new VertexStats(), null, null);
       VertexFinishedEvent deserializedEvent = (VertexFinishedEvent)
           testProtoConversion(event);
       Assert.assertEquals(event.getVertexID(), deserializedEvent.getVertexID());

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index 5c596c5..9477118 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -138,7 +138,7 @@ public class TestHistoryEventJsonConversion {
           break;
         case VERTEX_INITIALIZED:
           event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
-              random.nextInt(), "proc", null, null);
+              random.nextInt(), "proc", null, null, null);
           break;
         case VERTEX_STARTED:
           event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
@@ -149,7 +149,7 @@ public class TestHistoryEventJsonConversion {
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
               random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
-              null, null, null, null);
+              null, null, null, null, null);
           break;
         case TASK_STARTED:
           event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 1e77ce8..62f0937 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -515,8 +515,12 @@ public class HistoryEventTimelineConversion {
     atsEntity.addEvent(startEvt);
 
     atsEntity.addOtherInfo(ATSConstants.START_TIME, event.getStartTime());
-    atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
-    atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    if (event.getInProgressLogsUrl() != null) {
+      atsEntity.addOtherInfo(ATSConstants.IN_PROGRESS_LOGS_URL, event.getInProgressLogsUrl());
+    }
+    if (event.getCompletedLogsUrl() != null) {
+      atsEntity.addOtherInfo(ATSConstants.COMPLETED_LOGS_URL, event.getCompletedLogsUrl());
+    }
     atsEntity.addOtherInfo(ATSConstants.NODE_ID, event.getNodeId().toString());
     atsEntity.addOtherInfo(ATSConstants.NODE_HTTP_ADDRESS, event.getNodeHttpAddress());
     atsEntity.addOtherInfo(ATSConstants.CONTAINER_ID, event.getContainerId().toString());
@@ -615,6 +619,10 @@ public class HistoryEventTimelineConversion {
         DAGUtils.convertCountersToATSMap(event.getTezCounters()));
     atsEntity.addOtherInfo(ATSConstants.STATS,
         DAGUtils.convertVertexStatsToATSMap(event.getVertexStats()));
+    if (event.getServicePluginInfo() != null) {
+      atsEntity.addOtherInfo(ATSConstants.SERVICE_PLUGIN,
+          DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo()));
+    }
 
     final Map<String, Integer> vertexTaskStats = event.getVertexTaskStats();
     if (vertexTaskStats != null) {
@@ -651,6 +659,10 @@ public class HistoryEventTimelineConversion {
     atsEntity.addOtherInfo(ATSConstants.INIT_TIME, event.getInitedTime());
     atsEntity.addOtherInfo(ATSConstants.NUM_TASKS, event.getNumTasks());
     atsEntity.addOtherInfo(ATSConstants.PROCESSOR_CLASS_NAME, event.getProcessorName());
+    if (event.getServicePluginInfo() != null) {
+      atsEntity.addOtherInfo(ATSConstants.SERVICE_PLUGIN,
+          DAGUtils.convertServicePluginToATSMap(event.getServicePluginInfo()));
+    }
 
     return atsEntity;
   }

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index abfd757..f622056 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -47,6 +47,7 @@ import org.apache.tez.dag.api.records.DAGProtos.CallerContextProto;
 import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
 import org.apache.tez.dag.app.dag.DAGState;
 import org.apache.tez.dag.app.dag.VertexState;
+import org.apache.tez.dag.app.dag.impl.ServicePluginInfo;
 import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl.DataEventDependencyInfo;
 import org.apache.tez.dag.app.dag.impl.VertexStats;
 import org.apache.tez.dag.app.web.AMWebController;
@@ -157,7 +158,7 @@ public class TestHistoryEventTimelineConversion {
           break;
         case VERTEX_INITIALIZED:
           event = new VertexInitializedEvent(tezVertexID, "v1", random.nextInt(), random.nextInt(),
-              random.nextInt(), "proc", null, null);
+              random.nextInt(), "proc", null, null, null);
           break;
         case VERTEX_STARTED:
           event = new VertexStartedEvent(tezVertexID, random.nextInt(), random.nextInt());
@@ -168,7 +169,7 @@ public class TestHistoryEventTimelineConversion {
         case VERTEX_FINISHED:
           event = new VertexFinishedEvent(tezVertexID, "v1", 1, random.nextInt(), random.nextInt(),
               random.nextInt(), random.nextInt(), random.nextInt(), VertexState.ERROR,
-              null, null, null, null);
+              null, null, null, null, null);
           break;
         case TASK_STARTED:
           event = new TaskStartedEvent(tezTaskID, "v1", random.nextInt(), random.nextInt());
@@ -677,13 +678,19 @@ public class TestHistoryEventTimelineConversion {
         ((Integer) timelineEntity.getOtherInfo().get("BAR")).intValue());
   }
 
+  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testConvertVertexInitializedEvent() {
     long initRequestedTime = random.nextLong();
     long initedTime = random.nextLong();
     int numTasks = random.nextInt();
     VertexInitializedEvent event = new VertexInitializedEvent(tezVertexID, "v1", initRequestedTime,
-        initedTime, numTasks, "proc", null, null);
+        initedTime, numTasks, "proc", null, null,
+        new ServicePluginInfo().setContainerLauncherName("abc")
+            .setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
+            .setContainerLauncherClassName("abc1")
+            .setTaskSchedulerClassName("def1")
+            .setTaskCommunicatorClassName("ghi1"));
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
@@ -720,6 +727,25 @@ public class TestHistoryEventTimelineConversion {
         ((Long) timelineEntity.getOtherInfo().get(ATSConstants.INIT_TIME)).longValue());
     Assert.assertEquals(numTasks,
         ((Integer) timelineEntity.getOtherInfo().get(ATSConstants.NUM_TASKS)).intValue());
+    Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN));
+    Assert.assertEquals("abc",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.CONTAINER_LAUNCHER_NAME));
+    Assert.assertEquals("def",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_SCHEDULER_NAME));
+    Assert.assertEquals("ghi",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_COMMUNICATOR_NAME));
+    Assert.assertEquals("abc1",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
+    Assert.assertEquals("def1",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_SCHEDULER_CLASS_NAME));
+    Assert.assertEquals("ghi1",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
   }
 
   @Test(timeout = 5000)
@@ -756,6 +782,7 @@ public class TestHistoryEventTimelineConversion {
             timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
   }
 
+  @SuppressWarnings("unchecked")
   @Test(timeout = 5000)
   public void testConvertVertexFinishedEvent() {
     long initRequestedTime = random.nextLong();
@@ -770,7 +797,12 @@ public class TestHistoryEventTimelineConversion {
 
     VertexFinishedEvent event = new VertexFinishedEvent(tezVertexID, "v1", 1, initRequestedTime,
         initedTime, startRequestedTime, startTime, finishTime, VertexState.ERROR,
-        "diagnostics", null, vertexStats, taskStats);
+        "diagnostics", null, vertexStats, taskStats,
+        new ServicePluginInfo().setContainerLauncherName("abc")
+            .setTaskSchedulerName("def").setTaskCommunicatorName("ghi")
+            .setContainerLauncherClassName("abc1")
+            .setTaskSchedulerClassName("def1")
+            .setTaskCommunicatorClassName("ghi1"));
 
     TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
     Assert.assertEquals(EntityTypes.TEZ_VERTEX_ID.name(), timelineEntity.getEntityType());
@@ -801,6 +833,25 @@ public class TestHistoryEventTimelineConversion {
         timelineEntity.getOtherInfo().get(ATSConstants.STATUS));
     Assert.assertEquals("diagnostics",
         timelineEntity.getOtherInfo().get(ATSConstants.DIAGNOSTICS));
+    Assert.assertNotNull(timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN));
+    Assert.assertEquals("abc",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.CONTAINER_LAUNCHER_NAME));
+    Assert.assertEquals("def",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_SCHEDULER_NAME));
+    Assert.assertEquals("ghi",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_COMMUNICATOR_NAME));
+    Assert.assertEquals("abc1",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.CONTAINER_LAUNCHER_CLASS_NAME));
+    Assert.assertEquals("def1",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_SCHEDULER_CLASS_NAME));
+    Assert.assertEquals("ghi1",
+        ((Map<String, Object>)timelineEntity.getOtherInfo().get(ATSConstants.SERVICE_PLUGIN)).get(
+            ATSConstants.TASK_COMMUNICATOR_CLASS_NAME));
 
     Assert.assertTrue(timelineEntity.getOtherInfo().containsKey(ATSConstants.STATS));
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4694a9ef/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
index 3f669c6..b9229e2 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestRecovery.java
@@ -165,13 +165,13 @@ public class TestRecovery {
                 ApplicationAttemptId.newInstance(appId, 1), null)),
             new SimpleShutdownCondition(TIMING.POST,
                 new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0,
-                    "", null, initGeneratedEvents)),
+                    "", null, initGeneratedEvents, null)),
             new SimpleShutdownCondition(TIMING.POST,
                 new VertexInitializedEvent(vertexId1, "Summation", 0L, 0L, 0,
-                    "", null, null)),
+                    "", null, null, null)),
             new SimpleShutdownCondition(TIMING.POST,
                 new VertexInitializedEvent(vertexId2, "Sorter", 0L, 0L, 0, "",
-                    null, null)),
+                    null, null, null)),
 
             new SimpleShutdownCondition(TIMING.POST,
                 new VertexConfigurationDoneEvent(vertexId0, 0L, 2, null, null,
@@ -194,15 +194,15 @@ public class TestRecovery {
             new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
                 vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
                 VertexState.SUCCEEDED, "", new TezCounters(),
-                new VertexStats(), new HashMap<String, Integer>())),
+                new VertexStats(), new HashMap<String, Integer>(), null)),
             new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
                 vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
                 VertexState.SUCCEEDED, "", new TezCounters(),
-                new VertexStats(), new HashMap<String, Integer>())),
+                new VertexStats(), new HashMap<String, Integer>(), null)),
             new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
                 vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
                 VertexState.SUCCEEDED, "", new TezCounters(),
-                new VertexStats(), new HashMap<String, Integer>())),
+                new VertexStats(), new HashMap<String, Integer>(), null)),
 
             new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(
                 TezTaskID.getInstance(vertexId0, 0), "vertexName", 0L, 0L)),
@@ -376,11 +376,11 @@ public class TestRecovery {
             "dagName", new HashMap<String, Integer>(), ApplicationAttemptId
                 .newInstance(appId, 1), null)),
         new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
-            vertexId0, "hashSide", 0L, 0L, 0, "", null, initGeneratedEvents)),
+            vertexId0, "hashSide", 0L, 0L, 0, "", null, initGeneratedEvents, null)),
         new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
-            vertexId1, "streamingSide", 0L, 0L, 0, "", null, null)),
+            vertexId1, "streamingSide", 0L, 0L, 0, "", null, null, null)),
         new SimpleShutdownCondition(TIMING.POST, new VertexInitializedEvent(
-            vertexId2, "joiner", 0L, 0L, 0, "", null, null)),
+            vertexId2, "joiner", 0L, 0L, 0, "", null, null, null)),
 
         new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
             vertexId0, 0L, 0L)),
@@ -404,15 +404,15 @@ public class TestRecovery {
         new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
             vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
             VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
-            new HashMap<String, Integer>())),
+            new HashMap<String, Integer>(), null)),
         new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
             vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
             VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
-            new HashMap<String, Integer>())),
+            new HashMap<String, Integer>(), null)),
         new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
             vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
             VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(),
-            new HashMap<String, Integer>())),
+            new HashMap<String, Integer>(), null)),
 
         new SimpleShutdownCondition(TIMING.POST, new TaskStartedEvent(TezTaskID
             .getInstance(vertexId0, 0), "vertexName", 0L, 0L)),
@@ -574,7 +574,7 @@ public class TestRecovery {
                     0L, "username", "dagName")),
             new SimpleShutdownCondition(TIMING.POST,
                     new VertexInitializedEvent(vertexId0, "Tokenizer", 0L, 0L, 0,
-                            "", null, initGeneratedEvents)),
+                            "", null, initGeneratedEvents, null)),
             new SimpleShutdownCondition(TIMING.POST, new VertexStartedEvent(
                     vertexId0, 0L, 0L)),
             new SimpleShutdownCondition(TIMING.POST,
@@ -592,15 +592,15 @@ public class TestRecovery {
             new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
                     vertexId0, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
                     VertexState.SUCCEEDED, "", new TezCounters(),
-                    new VertexStats(), new HashMap<String, Integer>())),
+                    new VertexStats(), new HashMap<String, Integer>(), null)),
             new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
                     vertexId1, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
                     VertexState.SUCCEEDED, "", new TezCounters(),
-                    new VertexStats(), new HashMap<String, Integer>())),
+                    new VertexStats(), new HashMap<String, Integer>(), null)),
             new SimpleShutdownCondition(TIMING.POST, new VertexFinishedEvent(
                     vertexId2, "vertexName", 1, 0L, 0L, 0L, 0L, 0L,
                     VertexState.SUCCEEDED, "", new TezCounters(),
-                    new VertexStats(), new HashMap<String, Integer>())),
+                    new VertexStats(), new HashMap<String, Integer>(), null)),
             new SimpleShutdownCondition(TIMING.POST, new DAGFinishedEvent(
                     dagId, 0L, 0L, DAGState.SUCCEEDED, "", new TezCounters(),
                     "username", "dagName", new HashMap<String, Integer>(),


Mime
View raw message