hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [38/50] [abbrv] hadoop git commit: YARN-5355. Backported YARN-2928 into our branch-2 feature branch.
Date Sat, 16 Jul 2016 03:36:59 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 1e3b854..cb63ae3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -117,6 +117,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl.FlowContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
@@ -127,6 +128,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
@@ -144,14 +146,15 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.ByteString;
 
-import org.apache.hadoop.yarn.util.resource.Resources;
-
 public class ContainerManagerImpl extends CompositeService implements
     ContainerManager {
 
@@ -189,6 +192,9 @@ public class ContainerManagerImpl extends CompositeService implements
 
   private long waitForContainersOnShutdownMillis;
 
+  // NM metrics publisher is set only if the timeline service v.2 is enabled
+  private NMTimelinePublisher nmMetricsPublisher;
+
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
       NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
@@ -215,6 +221,15 @@ public class ContainerManagerImpl extends CompositeService implements
     auxiliaryServices.registerServiceListener(this);
     addService(auxiliaryServices);
 
+    // initialize the metrics publisher if the timeline service v.2 is enabled
+    // and the system publisher is enabled
+    Configuration conf = context.getConf();
+    if (YarnConfiguration.timelineServiceV2Enabled(conf) &&
+        YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+      LOG.info("YARN system metrics publishing service is enabled");
+      nmMetricsPublisher = createNMTimelinePublisher(context);
+      context.setNMTimelinePublisher(nmMetricsPublisher);
+    }
     this.containersMonitor = createContainersMonitor(exec);
     addService(this.containersMonitor);
 
@@ -222,7 +237,9 @@ public class ContainerManagerImpl extends CompositeService implements
         new ContainerEventDispatcher());
     dispatcher.register(ApplicationEventType.class,
         createApplicationEventDispatcher());
-    dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
+    dispatcher.register(LocalizationEventType.class,
+        new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc,
+            nmMetricsPublisher));
     dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
     dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
     dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
@@ -331,6 +348,7 @@ public class ContainerManagerImpl extends CompositeService implements
     }
 
     LOG.info("Recovering application " + appId);
+    //TODO: Recover flow and flow run ID
     ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), appId,
         creds, context, p.getAppLogAggregationInitedTime());
     context.getApplications().put(appId, app);
@@ -427,6 +445,14 @@ public class ContainerManagerImpl extends CompositeService implements
     return new SharedCacheUploadService();
   }
 
+  @VisibleForTesting
+  protected NMTimelinePublisher createNMTimelinePublisher(Context ctxt) {
+    NMTimelinePublisher nmTimelinePublisherLocal =
+        new NMTimelinePublisher(ctxt);
+    addIfService(nmTimelinePublisherLocal);
+    return nmTimelinePublisherLocal;
+  }
+
   protected ContainersLauncher createContainersLauncher(Context context,
       ContainerExecutor exec) {
     return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
@@ -951,27 +977,46 @@ public class ContainerManagerImpl extends CompositeService implements
     try {
       if (!isServiceStopped()) {
         // Create the application
-        Application application = new ApplicationImpl(dispatcher, user,
-            applicationID, credentials, context);
-        if (null == context.getApplications().putIfAbsent(applicationID,
-          application)) {
-          LOG.info("Creating a new application reference for app "
-              + applicationID);
-          LogAggregationContext logAggregationContext =
-              containerTokenIdentifier.getLogAggregationContext();
-          Map<ApplicationAccessType, String> appAcls =
-              container.getLaunchContext().getApplicationACLs();
-          context.getNMStateStore().storeApplication(applicationID,
-              buildAppProto(applicationID, user, credentials, appAcls,
-                logAggregationContext));
-          dispatcher.getEventHandler().handle(
-            new ApplicationInitEvent(applicationID, appAcls,
-              logAggregationContext));
+        // populate the flow context from the launch context if the timeline
+        // service v.2 is enabled
+        FlowContext flowContext = null;
+        if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+          String flowName = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_NAME_TAG_PREFIX);
+          String flowVersion = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_VERSION_TAG_PREFIX);
+          String flowRunIdStr = launchContext.getEnvironment().get(
+              TimelineUtils.FLOW_RUN_ID_TAG_PREFIX);
+          long flowRunId = 0L;
+          if (flowRunIdStr != null && !flowRunIdStr.isEmpty()) {
+            flowRunId = Long.parseLong(flowRunIdStr);
+          }
+          flowContext =
+              new FlowContext(flowName, flowVersion, flowRunId);
+        }
+        if (!context.getApplications().containsKey(applicationID)) {
+          Application application =
+              new ApplicationImpl(dispatcher, user, flowContext,
+                  applicationID, credentials, context);
+          if (context.getApplications().putIfAbsent(applicationID,
+              application) == null) {
+            LOG.info("Creating a new application reference for app "
+                + applicationID);
+            LogAggregationContext logAggregationContext =
+                containerTokenIdentifier.getLogAggregationContext();
+            Map<ApplicationAccessType, String> appAcls =
+                container.getLaunchContext().getApplicationACLs();
+            context.getNMStateStore().storeApplication(applicationID,
+                buildAppProto(applicationID, user, credentials, appAcls,
+                    logAggregationContext));
+            dispatcher.getEventHandler().handle(new ApplicationInitEvent(
+                applicationID, appAcls, logAggregationContext));
+          }
         }
 
-        this.context.getNMStateStore().storeContainer(containerId, request);
         dispatcher.getEventHandler().handle(
           new ApplicationContainerInitEvent(container));
+        this.context.getNMStateStore().storeContainer(containerId, request);
 
         this.context.getContainerTokenSecretManager().startContainerSuccessful(
           containerTokenIdentifier);
@@ -1304,6 +1349,9 @@ public class ContainerManagerImpl extends CompositeService implements
       Container c = containers.get(event.getContainerID());
       if (c != null) {
         c.handle(event);
+        if (nmMetricsPublisher != null) {
+          nmMetricsPublisher.publishContainerEvent(event);
+        }
       } else {
         LOG.warn("Event " + event + " sent to absent container " +
             event.getContainerID());
@@ -1312,7 +1360,6 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> {
-
     @Override
     public void handle(ApplicationEvent event) {
       Application app =
@@ -1320,6 +1367,9 @@ public class ContainerManagerImpl extends CompositeService implements
               event.getApplicationID());
       if (app != null) {
         app.handle(event);
+        if (nmMetricsPublisher != null) {
+          nmMetricsPublisher.publishApplicationEvent(event);
+        }
       } else {
         LOG.warn("Event " + event + " sent to absent application "
             + event.getApplicationID());
@@ -1327,6 +1377,27 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  private static final class LocalizationEventHandlerWrapper implements
+      EventHandler<LocalizationEvent> {
+
+    private EventHandler<LocalizationEvent> origLocalizationEventHandler;
+    private NMTimelinePublisher timelinePublisher;
+
+    LocalizationEventHandlerWrapper(EventHandler<LocalizationEvent> handler,
+        NMTimelinePublisher publisher) {
+      this.origLocalizationEventHandler = handler;
+      this.timelinePublisher = publisher;
+    }
+
+    @Override
+    public void handle(LocalizationEvent event) {
+      origLocalizationEventHandler.handle(event);
+      if (timelinePublisher != null) {
+        timelinePublisher.publishLocalizationEvent(event);
+      }
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void handle(ContainerManagerEvent event) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
index b1571e9..aee0862 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
@@ -35,4 +35,9 @@ public interface Application extends EventHandler<ApplicationEvent> {
 
   ApplicationState getApplicationState();
 
+  String getFlowName();
+
+  String getFlowVersion();
+
+  long getFlowRunId();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
index 6b8007f..0a8ffdf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
@@ -19,18 +19,24 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 
 public class ApplicationContainerFinishedEvent extends ApplicationEvent {
-  private ContainerId containerID;
+  private ContainerStatus containerStatus;
 
-  public ApplicationContainerFinishedEvent(
-      ContainerId containerID) {
-    super(containerID.getApplicationAttemptId().getApplicationId(),
+  public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) {
+    super(containerStatus.getContainerId().getApplicationAttemptId().
+        getApplicationId(),
         ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
-    this.containerID = containerID;
+    this.containerStatus = containerStatus;
   }
 
   public ContainerId getContainerID() {
-    return this.containerID;
+    return containerStatus.getContainerId();
   }
+
+  public ContainerStatus getContainerStatus() {
+    return containerStatus;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index c179dad..531693e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -30,6 +30,7 @@ import com.google.protobuf.ByteString;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -39,6 +40,7 @@ import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -55,6 +57,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -71,6 +74,8 @@ public class ApplicationImpl implements Application {
 
   final Dispatcher dispatcher;
   final String user;
+  // flow context is set only if the timeline service v.2 is enabled
+  private FlowContext flowContext;
   final ApplicationId appId;
   final Credentials credentials;
   Map<ApplicationAccessType, String> applicationACLs;
@@ -96,13 +101,35 @@ public class ApplicationImpl implements Application {
   private final NMStateStoreService appStateStore;
 
   public ApplicationImpl(Dispatcher dispatcher, String user,
-      ApplicationId appId, Credentials credentials,
+      ApplicationId appId, Credentials credentials, Context context) {
+    this(dispatcher, user, null, appId, credentials, context, -1L);
+  }
+
+  public ApplicationImpl(Dispatcher dispatcher, String user,
+      ApplicationId appId, Credentials credentials, Context context,
+      long recoveredLogInitedTime) {
+    this(dispatcher, user, null, appId, credentials, context,
+      recoveredLogInitedTime);
+  }
+
+  public ApplicationImpl(Dispatcher dispatcher, String user,
+      FlowContext flowContext, ApplicationId appId, Credentials credentials,
       Context context, long recoveredLogInitedTime) {
     this.dispatcher = dispatcher;
     this.user = user;
     this.appId = appId;
     this.credentials = credentials;
     this.aclsManager = context.getApplicationACLsManager();
+    Configuration conf = context.getConf();
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      if (flowContext == null) {
+        throw new IllegalArgumentException("flow context cannot be null");
+      }
+      this.flowContext = flowContext;
+      if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+        context.getNMTimelinePublisher().createTimelineClient(appId);
+      }
+    }
     this.context = context;
     this.appStateStore = context.getNMStateStore();
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -113,8 +140,37 @@ public class ApplicationImpl implements Application {
   }
 
   public ApplicationImpl(Dispatcher dispatcher, String user,
-      ApplicationId appId, Credentials credentials, Context context) {
-    this(dispatcher, user, appId, credentials, context, -1);
+      FlowContext flowContext, ApplicationId appId,
+      Credentials credentials, Context context) {
+    this(dispatcher, user, flowContext, appId, credentials,
+      context, -1);
+  }
+
+  /**
+   * Data object that encapsulates the flow context for the application purpose.
+   */
+  public static class FlowContext {
+    private final String flowName;
+    private final String flowVersion;
+    private final long flowRunId;
+
+    public FlowContext(String flowName, String flowVersion, long flowRunId) {
+      this.flowName = flowName;
+      this.flowVersion = flowVersion;
+      this.flowRunId = flowRunId;
+    }
+
+    public String getFlowName() {
+      return flowName;
+    }
+
+    public String getFlowVersion() {
+      return flowVersion;
+    }
+
+    public long getFlowRunId() {
+      return flowRunId;
+    }
   }
 
   @Override
@@ -496,6 +552,20 @@ public class ApplicationImpl implements Application {
           new LogHandlerAppFinishedEvent(app.appId));
 
       app.context.getNMTokenSecretManager().appFinished(app.getAppId());
+      // Remove collectors info for finished apps.
+      // TODO check we remove related collectors info in failure cases
+      // (YARN-3038)
+      Map<ApplicationId, String> registeredCollectors =
+          app.context.getRegisteredCollectors();
+      if (registeredCollectors != null) {
+        registeredCollectors.remove(app.getAppId());
+      }
+      // stop timelineClient when application get finished.
+      NMTimelinePublisher nmTimelinePublisher =
+          app.context.getNMTimelinePublisher();
+      if (nmTimelinePublisher != null) {
+        nmTimelinePublisher.stopTimelineClient(app.getAppId());
+      }
     }
   }
 
@@ -554,4 +624,19 @@ public class ApplicationImpl implements Application {
       this.readLock.unlock();
     }
   }
+
+  @Override
+  public String getFlowName() {
+    return flowContext == null ? null : flowContext.getFlowName();
+  }
+
+  @Override
+  public String getFlowVersion() {
+    return flowContext == null ? null : flowContext.getFlowVersion();
+  }
+
+  @Override
+  public long getFlowRunId() {
+    return flowContext == null ? 0L : flowContext.getFlowRunId();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
index 7571964..2278786 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -69,4 +70,6 @@ public interface Container extends EventHandler<ContainerEvent> {
 
   String toString();
 
+  Priority getPriority();
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
index 7a6e1cf..193dfea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -72,11 +74,11 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStartMonitoringEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainerStopMonitoringEvent;
-import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -84,7 +86,6 @@ import org.apache.hadoop.yarn.state.SingleArcTransition;
 import org.apache.hadoop.yarn.state.StateMachine;
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -197,6 +198,7 @@ public class ContainerImpl implements Container {
     }
 
     stateMachine = stateMachineFactory.make(this);
+    this.context = context;
   }
 
   // constructor for a recovered container
@@ -442,6 +444,10 @@ public class ContainerImpl implements Container {
     }
   }
 
+  public NMTimelinePublisher getNMTimelinePublisher() {
+    return context.getNMTimelinePublisher();
+  }
+
   @Override
   public String getUser() {
     this.readLock.lock();
@@ -575,7 +581,10 @@ public class ContainerImpl implements Container {
     // Inform the application
     @SuppressWarnings("rawtypes")
     EventHandler eventHandler = dispatcher.getEventHandler();
-    eventHandler.handle(new ApplicationContainerFinishedEvent(containerId));
+
+    ContainerStatus containerStatus = cloneAndGetContainerStatus();
+    eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
+
     // Remove the container from the resource-monitor
     eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
     // Tell the logService too
@@ -1187,7 +1196,8 @@ public class ContainerImpl implements Container {
         container.containerMetrics.finished();
       }
       container.sendFinishedEvents();
-      //if the current state is NEW it means the CONTAINER_INIT was never 
+
+      // if the current state is NEW it means the CONTAINER_INIT was never
       // sent for the event, thus no need to send the CONTAINER_STOP
       if (container.getCurrentState()
           != org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
@@ -1384,4 +1394,9 @@ public class ContainerImpl implements Container {
   ContainerRetryContext getContainerRetryContext() {
     return containerRetryContext;
   }
+
+  @Override
+  public Priority getPriority() {
+    return containerTokenIdentifier.getPriority();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
index b5c2747..e6a66bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
@@ -22,27 +22,30 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
-import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 public class ContainersMonitorImpl extends AbstractService implements
@@ -81,6 +84,11 @@ public class ContainersMonitorImpl extends AbstractService implements
   private static final long UNKNOWN_MEMORY_LIMIT = -1L;
   private int nodeCpuPercentageForYARN;
 
+  @Private
+  public static enum ContainerMetric {
+    CPU, MEMORY
+  }
+
   private ResourceUtilization containersUtilization;
   // Tracks the aggregated allocation of the currently allocated containers
   // when queuing of containers at the NMs is enabled.
@@ -427,7 +435,9 @@ public class ContainersMonitorImpl extends AbstractService implements
                     + " for the first time");
 
                 ResourceCalculatorProcessTree pt =
-                    ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);
+                    ResourceCalculatorProcessTree.
+                        getResourceCalculatorProcessTree(
+                            pId, processTreeClass, conf);
                 ptInfo.setPid(pId);
                 ptInfo.setProcessTree(pt);
 
@@ -451,6 +461,7 @@ public class ContainersMonitorImpl extends AbstractService implements
             pTree.updateProcessTree();    // update process-tree
             long currentVmemUsage = pTree.getVirtualMemorySize();
             long currentPmemUsage = pTree.getRssMemorySize();
+
             // if machine has 6 cores and 3 are used,
             // cpuUsagePercentPerCore should be 300% and
             // cpuUsageTotalCoresPercentage should be 50%
@@ -557,10 +568,19 @@ public class ContainersMonitorImpl extends AbstractService implements
               trackingContainers.remove(containerId);
               LOG.info("Removed ProcessTree with root " + pId);
             }
+
+            ContainerImpl container =
+                (ContainerImpl) context.getContainers().get(containerId);
+            NMTimelinePublisher nmMetricsPublisher =
+                container.getNMTimelinePublisher();
+            if (nmMetricsPublisher != null) {
+              nmMetricsPublisher.reportContainerResourceUsage(container,
+                  currentPmemUsage, cpuUsagePercentPerCore);
+            }
           } catch (Exception e) {
             // Log the exception and proceed to the next container.
-            LOG.warn("Uncaught exception in ContainerMemoryManager "
-                + "while managing memory of " + containerId, e);
+            LOG.warn("Uncaught exception in ContainersMonitorImpl "
+                + "while monitoring resource of " + containerId, e);
           }
         }
         if (LOG.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
new file mode 100644
index 0000000..f275b37
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+/**
+ * Event posted to NMTimelinePublisher which in turn publishes it to
+ * timelineservice v2.
+ */
+public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
+  public NMTimelineEvent(NMTimelineEventType type) {
+    super(type);
+  }
+
+  public NMTimelineEvent(NMTimelineEventType type, long timestamp) {
+    super(type, timestamp);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
new file mode 100644
index 0000000..b4ae45a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+/**
+ * Type of {@link NMTimelineEvent}.
+ */
+public enum NMTimelineEventType {
+  // Publish the NM Timeline entity
+  TIMELINE_ENTITY_PUBLISH,
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
new file mode 100644
index 0000000..c4d45a9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
@@ -0,0 +1,405 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ContainerLocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Metrics publisher service that publishes data to the timeline service v.2. It
+ * is used only if the timeline service v.2 is enabled and the system publishing
+ * of events and metrics is enabled.
+ */
+public class NMTimelinePublisher extends CompositeService {
+
+  private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class);
+
+  private Dispatcher dispatcher;
+
+  private Context context;
+
+  private NodeId nodeId;
+
+  private String httpAddress;
+
+  private final Map<ApplicationId, TimelineClient> appToClientMap;
+
+  public NMTimelinePublisher(Context context) {
+    super(NMTimelinePublisher.class.getName());
+    this.context = context;
+    appToClientMap = new ConcurrentHashMap<>();
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    dispatcher = new AsyncDispatcher();
+    dispatcher.register(NMTimelineEventType.class,
+        new ForwardingEventHandler());
+    addIfService(dispatcher);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    // context will be updated after containerManagerImpl is started
+    // hence NMMetricsPublisher is added subservice of containerManagerImpl
+    this.nodeId = context.getNodeId();
+    this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort();
+  }
+
+  @VisibleForTesting
+  Map<ApplicationId, TimelineClient> getAppToClientMap() {
+    return appToClientMap;
+  }
+
+  protected void handleNMTimelineEvent(NMTimelineEvent event) {
+    switch (event.getType()) {
+    case TIMELINE_ENTITY_PUBLISH:
+      putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(),
+          ((TimelinePublishEvent) event).getApplicationId());
+      break;
+    default:
+      LOG.error("Unknown NMTimelineEvent type: " + event.getType());
+    }
+  }
+
+  public void reportContainerResourceUsage(Container container, Long pmemUsage,
+      Float cpuUsagePercentPerCore) {
+    if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE ||
+        cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
+      ContainerEntity entity =
+          createContainerEntity(container.getContainerId());
+      long currentTimeMillis = System.currentTimeMillis();
+      if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
+        TimelineMetric memoryMetric = new TimelineMetric();
+        memoryMetric.setId(ContainerMetric.MEMORY.toString());
+        memoryMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+        memoryMetric.addValue(currentTimeMillis, pmemUsage);
+        entity.addMetric(memoryMetric);
+      }
+      if (cpuUsagePercentPerCore != ResourceCalculatorProcessTree.UNAVAILABLE) {
+        TimelineMetric cpuMetric = new TimelineMetric();
+        cpuMetric.setId(ContainerMetric.CPU.toString());
+        // TODO: support average
+        cpuMetric.setRealtimeAggregationOp(TimelineMetricOperation.SUM);
+        cpuMetric.addValue(currentTimeMillis,
+            Math.round(cpuUsagePercentPerCore));
+        entity.addMetric(cpuMetric);
+      }
+      ApplicationId appId = container.getContainerId().getApplicationAttemptId()
+          .getApplicationId();
+      try {
+        // no need to put it as part of publisher as timeline client already has
+        // Queuing concept
+        TimelineClient timelineClient = getTimelineClient(appId);
+        if (timelineClient != null) {
+          timelineClient.putEntitiesAsync(entity);
+        } else {
+          LOG.error("Seems like client has been removed before the container"
+              + " metric could be published for " + container.getContainerId());
+        }
+      } catch (IOException | YarnException e) {
+        LOG.error("Failed to publish Container metrics for container "
+            + container.getContainerId(), e);
+      }
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private void publishContainerCreatedEvent(ContainerEvent event) {
+    ContainerId containerId = event.getContainerID();
+    ContainerEntity entity = createContainerEntity(containerId);
+    Container container = context.getContainers().get(containerId);
+    Resource resource = container.getResource();
+
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+        resource.getMemory());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+        resource.getVirtualCores());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+        nodeId.getHost());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+        nodeId.getPort());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+        container.getPriority().toString());
+    entityInfo.put(
+        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+        httpAddress);
+    entity.setInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(event.getTimestamp());
+
+    entity.addEvent(tEvent);
+    entity.setCreatedTime(event.getTimestamp());
+    dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+        containerId.getApplicationAttemptId().getApplicationId()));
+  }
+
+  @SuppressWarnings("unchecked")
+  private void publishContainerFinishedEvent(ContainerStatus containerStatus,
+      long timeStamp) {
+    ContainerId containerId = containerStatus.getContainerId();
+    TimelineEntity entity = createContainerEntity(containerId);
+
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        containerStatus.getDiagnostics());
+    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+        containerStatus.getExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, containerStatus
+        .getState().toString());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(timeStamp);
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+
+    dispatcher.getEventHandler().handle(new TimelinePublishEvent(entity,
+        containerId.getApplicationAttemptId().getApplicationId()));
+  }
+
+  private void publishContainerLocalizationEvent(
+      ContainerLocalizationEvent event, String eventType) {
+    Container container = event.getContainer();
+    ContainerId containerId = container.getContainerId();
+    TimelineEntity entity = createContainerEntity(containerId);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(eventType);
+    tEvent.setTimestamp(event.getTimestamp());
+    entity.addEvent(tEvent);
+
+    ApplicationId appId =
+        container.getContainerId().getApplicationAttemptId().getApplicationId();
+    try {
+      // no need to put it as part of publisher as timeline client already has
+      // Queuing concept
+      TimelineClient timelineClient = getTimelineClient(appId);
+      if (timelineClient != null) {
+        timelineClient.putEntitiesAsync(entity);
+      } else {
+        LOG.error("Seems like client has been removed before the event could be"
+            + " published for " + container.getContainerId());
+      }
+    } catch (IOException | YarnException e) {
+      LOG.error("Failed to publish Container metrics for container "
+          + container.getContainerId(), e);
+    }
+  }
+
+  private static ContainerEntity createContainerEntity(
+      ContainerId containerId) {
+    ContainerEntity entity = new ContainerEntity();
+    entity.setId(containerId.toString());
+    Identifier parentIdentifier = new Identifier();
+    parentIdentifier
+        .setType(TimelineEntityType.YARN_APPLICATION_ATTEMPT.name());
+    parentIdentifier.setId(containerId.getApplicationAttemptId().toString());
+    entity.setParent(parentIdentifier);
+    return entity;
+  }
+
+  private void putEntity(TimelineEntity entity, ApplicationId appId) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      }
+      TimelineClient timelineClient = getTimelineClient(appId);
+      if (timelineClient != null) {
+        timelineClient.putEntities(entity);
+      } else {
+        LOG.error("Seems like client has been removed before the entity "
+            + "could be published for " + entity);
+      }
+    } catch (Exception e) {
+      LOG.error("Error when publishing entity " + entity, e);
+    }
+  }
+
+  public void publishApplicationEvent(ApplicationEvent event) {
+    // publish only when the desired event is received
+    switch (event.getType()) {
+    case INIT_APPLICATION:
+    case FINISH_APPLICATION:
+    case APPLICATION_LOG_HANDLING_FAILED:
+      // TODO need to be handled in future,
+      // not sure to publish under which entity
+      break;
+    case APPLICATION_CONTAINER_FINISHED:
+      // this is actually used to publish the container Event
+      ApplicationContainerFinishedEvent evnt =
+          (ApplicationContainerFinishedEvent) event;
+      publishContainerFinishedEvent(evnt.getContainerStatus(),
+          event.getTimestamp());
+      break;
+
+    default:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(event.getType() + " is not a desired ApplicationEvent which"
+            + " needs to be published by NMTimelinePublisher");
+      }
+      break;
+    }
+  }
+
+  public void publishContainerEvent(ContainerEvent event) {
+    // publish only when the desired event is received
+    switch (event.getType()) {
+    case INIT_CONTAINER:
+      publishContainerCreatedEvent(event);
+      break;
+
+    default:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(event.getType()
+            + " is not a desired ContainerEvent which needs to be published by"
+            + " NMTimelinePublisher");
+      }
+      break;
+    }
+  }
+
+  public void publishLocalizationEvent(LocalizationEvent event) {
+    // publish only when the desired event is received
+    switch (event.getType()) {
+    case CONTAINER_RESOURCES_LOCALIZED:
+      publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
+          ContainerMetricsConstants.LOCALIZATION_FINISHED_EVENT_TYPE);
+      break;
+    case INIT_CONTAINER_RESOURCES:
+      publishContainerLocalizationEvent((ContainerLocalizationEvent) event,
+          ContainerMetricsConstants.LOCALIZATION_START_EVENT_TYPE);
+      break;
+    default:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(event.getType()
+            + " is not a desired LocalizationEvent which needs to be published"
+            + " by NMTimelinePublisher");
+      }
+      break;
+    }
+  }
+
+  /**
+   * EventHandler implementation which forward events to NMMetricsPublisher.
+   * Making use of it, NMMetricsPublisher can avoid to have a public handle
+   * method.
+   */
+  private final class ForwardingEventHandler implements
+      EventHandler<NMTimelineEvent> {
+
+    @Override
+    public void handle(NMTimelineEvent event) {
+      handleNMTimelineEvent(event);
+    }
+  }
+
+  private static class TimelinePublishEvent extends NMTimelineEvent {
+    private ApplicationId appId;
+    private TimelineEntity entityToPublish;
+
+    public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) {
+      super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System
+          .currentTimeMillis());
+      this.appId = appId;
+      this.entityToPublish = entity;
+    }
+
+    public ApplicationId getApplicationId() {
+      return appId;
+    }
+
+    public TimelineEntity getTimelineEntityToPublish() {
+      return entityToPublish;
+    }
+  }
+
+  public void createTimelineClient(ApplicationId appId) {
+    if (!appToClientMap.containsKey(appId)) {
+      TimelineClient timelineClient =
+          TimelineClient.createTimelineClient(appId);
+      timelineClient.init(getConfig());
+      timelineClient.start();
+      appToClientMap.put(appId, timelineClient);
+    }
+  }
+
+  public void stopTimelineClient(ApplicationId appId) {
+    TimelineClient client = appToClientMap.remove(appId);
+    if (client != null) {
+      client.stop();
+    }
+  }
+
+  public void setTimelineServiceAddress(ApplicationId appId,
+      String collectorAddr) {
+    TimelineClient client = appToClientMap.get(appId);
+    if (client != null) {
+      client.setTimelineServiceAddress(collectorAddr);
+    }
+  }
+
+  private TimelineClient getTimelineClient(ApplicationId appId) {
+    return appToClientMap.get(appId);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/package-info.java
new file mode 100644
index 0000000..66233fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/package-info.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.nodemanager.timelineservice contains
+ * classes related to publishing container events and other NM lifecycle events
+ * to ATSv2.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
index a9ff83c..3b84a78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
@@ -81,7 +81,7 @@ public class TestEventFlow {
     
     Context context = new NMContext(new NMContainerTokenSecretManager(conf),
         new NMTokenSecretManagerInNM(), null, null,
-        new NMNullStateStoreService(), false) {
+        new NMNullStateStoreService(), false, conf) {
       @Override
       public int getHttpPort() {
         return 1234;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 7975f23..e7d18b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -1215,6 +1215,7 @@ public class TestNodeStatusUpdater {
         BuilderUtils.newContainerToken(containerId, "host", 1234, "user",
             BuilderUtils.newResource(1024, 1), 0, 123,
             "password".getBytes(), 0);
+
     Container completedContainer = new ContainerImpl(conf, null,
         null, null, null,
         BuilderUtils.newContainerTokenIdentifier(containerToken),
@@ -1704,9 +1705,10 @@ public class TestNodeStatusUpdater {
       protected NMContext createNMContext(
           NMContainerTokenSecretManager containerTokenSecretManager,
           NMTokenSecretManagerInNM nmTokenSecretManager,
-          NMStateStoreService store, boolean isDistributedSchedulingEnabled) {
+          NMStateStoreService store, boolean isDistributedSchedulingEnabled,
+          Configuration config) {
         return new MyNMContext(containerTokenSecretManager,
-          nmTokenSecretManager);
+          nmTokenSecretManager, config);
       }
     };
 
@@ -1937,9 +1939,9 @@ public class TestNodeStatusUpdater {
 
     public MyNMContext(
         NMContainerTokenSecretManager containerTokenSecretManager,
-        NMTokenSecretManagerInNM nmTokenSecretManager) {
+        NMTokenSecretManagerInNM nmTokenSecretManager, Configuration conf) {
       super(containerTokenSecretManager, nmTokenSecretManager, null, null,
-          new NMNullStateStoreService(), false);
+          new NMNullStateStoreService(), false, conf);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 4f726d4..f716d44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
@@ -617,6 +618,11 @@ public abstract class BaseAMRMProxyTest {
     }
 
     @Override
+    public Map<ApplicationId, String> getRegisteredCollectors() {
+      return null;
+    }
+
+    @Override
     public ConcurrentMap<ContainerId, Container> getContainers() {
       return null;
     }
@@ -667,6 +673,11 @@ public abstract class BaseAMRMProxyTest {
     }
 
     @Override
+    public Configuration getConf() {
+      return null;
+    }
+
+    @Override
     public void setDecommissioned(boolean isDecommissioned) {
     }
 
@@ -698,5 +709,13 @@ public abstract class BaseAMRMProxyTest {
     public OpportunisticContainerAllocator getContainerAllocator() {
       return null;
     }
+
+    public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) {
+    }
+
+    @Override
+    public NMTimelinePublisher getNMTimelinePublisher() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
index 4f0e5c3..726b353 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
@@ -119,7 +119,8 @@ public abstract class BaseContainerManagerTest {
   protected Configuration conf = new YarnConfiguration();
   protected Context context = new NMContext(new NMContainerTokenSecretManager(
     conf), new NMTokenSecretManagerInNM(), null,
-    new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
+      new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
+      conf) {
     public int getHttpPort() {
       return HTTP_PORT;
     };

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index b7d0e48..32dddae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.isA;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -50,7 +50,6 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
@@ -95,12 +94,12 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
-import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Before;
@@ -559,7 +558,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
       NMStateStoreService stateStore) {
     NMContext context = new NMContext(new NMContainerTokenSecretManager(
         conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore, false){
+        new ApplicationACLsManager(conf), stateStore, false, conf) {
       public int getHttpPort() {
         return HTTP_PORT;
       }
@@ -722,6 +721,12 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
               boolean blockNewContainerRequests) {
             // do nothing
           }
+
+          @Override
+          public NMTimelinePublisher
+              createNMTimelinePublisher(Context context) {
+            return null;
+          }
     };
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
index 157ba97..05ea036 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
@@ -34,7 +34,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -539,7 +541,8 @@ public class TestApplication {
         new ApplicationACLsManager(conf));
       when(context.getNMTokenSecretManager()).thenReturn(nmTokenSecretMgr);
       when(context.getNMStateStore()).thenReturn(stateStoreService);
-      
+      when(context.getConf()).thenReturn(conf);
+
       // Setting master key
       MasterKey masterKey = new MasterKeyPBImpl();
       masterKey.setKeyId(123);
@@ -550,7 +553,8 @@ public class TestApplication {
       this.user = user;
       this.appId = BuilderUtils.newApplicationId(timestamp, id);
 
-      app = new ApplicationImpl(dispatcher, this.user, appId, null, context);
+      app = new ApplicationImpl(
+          dispatcher, this.user, appId, null, context);
       containers = new ArrayList<Container>();
       for (int i = 0; i < numContainers; i++) {
         Container container = createMockedContainer(this.appId, i);
@@ -597,7 +601,7 @@ public class TestApplication {
 
     public void containerFinished(int containerNum) {
       app.handle(new ApplicationContainerFinishedEvent(containers.get(
-          containerNum).getContainerId()));
+          containerNum).cloneAndGetContainerStatus()));
       drainDispatcherEvents();
     }
 
@@ -641,6 +645,9 @@ public class TestApplication {
     when(c.getLaunchContext()).thenReturn(launchContext);
     when(launchContext.getApplicationACLs()).thenReturn(
         new HashMap<ApplicationAccessType, String>());
+    when(c.cloneAndGetContainerStatus()).thenReturn(
+        BuilderUtils.newContainerStatus(cId,
+            ContainerState.NEW, "", 0, Resource.newInstance(1024, 1)));
     return c;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
index beb9375..6b32dd9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
@@ -99,7 +99,6 @@ import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.hamcrest.CoreMatchers;
@@ -111,15 +110,17 @@ import org.junit.Test;
 public class TestContainerLaunch extends BaseContainerManagerTest {
 
   private static final String INVALID_JAVA_HOME = "/no/jvm/here";
-  protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
-    conf), new NMTokenSecretManagerInNM(), null,
-    new ApplicationACLsManager(conf), new NMNullStateStoreService(), false) {
-    public int getHttpPort() {
-      return HTTP_PORT;
-    };
-    public NodeId getNodeId() {
-      return NodeId.newInstance("ahost", 1234);
-    };
+  private Context distContext =
+      new NMContext(new NMContainerTokenSecretManager(conf),
+          new NMTokenSecretManagerInNM(), null,
+          new ApplicationACLsManager(conf), new NMNullStateStoreService(),
+          false, conf) {
+        public int getHttpPort() {
+          return HTTP_PORT;
+        };
+        public NodeId getNodeId() {
+          return NodeId.newInstance("ahost", 1234);
+        };
   };
 
   public TestContainerLaunch() throws UnsupportedFileSystemException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
index c768df1..df00f9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
@@ -82,7 +82,7 @@ public class TestLocalCacheDirectoryManager {
         new NMContext(new NMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInNM(), null,
           new ApplicationACLsManager(conf), new NMNullStateStoreService(),
-            false);
+            false, conf);
     ResourceLocalizationService service =
         new ResourceLocalizationService(null, null, null, null, nmContext);
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index f594d8c..23786fe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -186,7 +186,8 @@ public class TestResourceLocalizationService {
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
     nmContext = new NMContext(new NMContainerTokenSecretManager(
       conf), new NMTokenSecretManagerInNM(), null,
-      new ApplicationACLsManager(conf), new NMNullStateStoreService(), false);
+      new ApplicationACLsManager(conf), new NMNullStateStoreService(), false,
+          conf);
   }
 
   @After
@@ -2369,7 +2370,7 @@ public class TestResourceLocalizationService {
     NMContext nmContext =
         new NMContext(new NMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInNM(), null,
-          new ApplicationACLsManager(conf), stateStore, false);
+          new ApplicationACLsManager(conf), stateStore, false, conf);
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
                                       dirsHandler, nmContext);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb0a24ca/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
index f929ca8..88d9688 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestAppLogAggregatorImpl.java
@@ -393,7 +393,7 @@ public class TestAppLogAggregatorImpl {
         new NMTokenSecretManagerInNM(),
         null,
         new ApplicationACLsManager(conf),
-        new NMNullStateStoreService(), false);
+        new NMNullStateStoreService(), false, conf);
   }
 
   private static final class AppLogAggregatorInTest extends
@@ -431,4 +431,4 @@ public class TestAppLogAggregatorImpl {
       return spy(new LogWriter(conf, remoteAppLogFile, ugi));
     }
   }
-}
\ No newline at end of file
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message