hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sj...@apache.org
Subject [24/30] hadoop git commit: YARN-5355. Backported YARN-2928 into our branch-2 feature branch.
Date Thu, 14 Jul 2016 16:49:26 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 0a9895e..d0c1198 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -19,17 +19,23 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
@@ -52,6 +58,8 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private Set<NodeLabel> labels = null;
   private List<LogAggregationReport> logAggregationReportsForApps = null;
 
+  private Map<ApplicationId, String> registeredCollectors = null;
+
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
   }
@@ -106,6 +114,9 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     if (this.logAggregationReportsForApps != null) {
       addLogAggregationStatusForAppsToProto();
     }
+    if (this.registeredCollectors != null) {
+      addRegisteredCollectorsToProto();
+    }
   }
 
   private void addLogAggregationStatusForAppsToProto() {
@@ -147,6 +158,17 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     return ((LogAggregationReportPBImpl) value).getProto();
   }
 
+  private void addRegisteredCollectorsToProto() {
+    maybeInitBuilder();
+    builder.clearRegisteredCollectors();
+    for (Map.Entry<ApplicationId, String> entry :
+        registeredCollectors.entrySet()) {
+      builder.addRegisteredCollectors(AppCollectorsMapProto.newBuilder()
+          .setAppId(convertToProtoFormat(entry.getKey()))
+          .setAppCollectorAddr(entry.getValue()));
+    }
+  }
+
   private void mergeLocalToProto() {
     if (viaProto) 
       maybeInitBuilder();
@@ -228,6 +250,38 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     this.lastKnownNMTokenMasterKey = masterKey;
   }
 
+  @Override
+  public Map<ApplicationId, String> getRegisteredCollectors() {
+    if (this.registeredCollectors != null) {
+      return this.registeredCollectors;
+    }
+    initRegisteredCollectors();
+    return registeredCollectors;
+  }
+
+  private void initRegisteredCollectors() {
+    NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppCollectorsMapProto> list = p.getRegisteredCollectorsList();
+    if (!list.isEmpty()) {
+      this.registeredCollectors = new HashMap<>();
+      for (AppCollectorsMapProto c : list) {
+        ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        this.registeredCollectors.put(appId, c.getAppCollectorAddr());
+      }
+    }
+  }
+
+  @Override
+  public void setRegisteredCollectors(
+      Map<ApplicationId, String> registeredCollectors) {
+    if (registeredCollectors == null || registeredCollectors.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.registeredCollectors = new HashMap<ApplicationId, String>();
+    this.registeredCollectors.putAll(registeredCollectors);
+  }
+
   private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
     return new NodeStatusPBImpl(p);
   }
@@ -236,6 +290,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     return ((NodeStatusPBImpl)t).getProto();
   }
 
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
   private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
     return new MasterKeyPBImpl(p);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 3422697..cd85241 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueui
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
@@ -68,6 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends
   private List<ApplicationId> applicationsToCleanup = null;
   private Map<ApplicationId, ByteBuffer> systemCredentials = null;
   private Resource resource = null;
+  private Map<ApplicationId, String> appCollectorsMap = null;
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -125,6 +127,9 @@ public class NodeHeartbeatResponsePBImpl extends
     if (this.resource != null) {
       builder.setResource(convertToProtoFormat(this.resource));
     }
+    if (this.appCollectorsMap != null) {
+      addAppCollectorsMapToProto();
+    }
   }
 
   private void addSystemCredentialsToProto() {
@@ -138,6 +143,16 @@ public class NodeHeartbeatResponsePBImpl extends
     }
   }
 
+  private void addAppCollectorsMapToProto() {
+    maybeInitBuilder();
+    builder.clearAppCollectorsMap();
+    for (Map.Entry<ApplicationId, String> entry : appCollectorsMap.entrySet()) {
+      builder.addAppCollectorsMap(AppCollectorsMapProto.newBuilder()
+          .setAppId(convertToProtoFormat(entry.getKey()))
+          .setAppCollectorAddr(entry.getValue()));
+    }
+  }
+
   private void mergeLocalToProto() {
     if (viaProto) 
       maybeInitBuilder();
@@ -551,6 +566,15 @@ public class NodeHeartbeatResponsePBImpl extends
     return systemCredentials;
   }
 
+  @Override
+  public Map<ApplicationId, String> getAppCollectorsMap() {
+    if (this.appCollectorsMap != null) {
+      return this.appCollectorsMap;
+    }
+    initAppCollectorsMap();
+    return appCollectorsMap;
+  }
+
   private void initSystemCredentials() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     List<SystemCredentialsForAppsProto> list = p.getSystemCredentialsForAppsList();
@@ -562,6 +586,18 @@ public class NodeHeartbeatResponsePBImpl extends
     }
   }
 
+  private void initAppCollectorsMap() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppCollectorsMapProto> list = p.getAppCollectorsMapList();
+    if (!list.isEmpty()) {
+      this.appCollectorsMap = new HashMap<>();
+      for (AppCollectorsMapProto c : list) {
+        ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        this.appCollectorsMap.put(appId, c.getAppCollectorAddr());
+      }
+    }
+  }
+
   @Override
   public void setSystemCredentialsForApps(
       Map<ApplicationId, ByteBuffer> systemCredentials) {
@@ -574,6 +610,17 @@ public class NodeHeartbeatResponsePBImpl extends
   }
 
   @Override
+  public void setAppCollectorsMap(
+      Map<ApplicationId, String> appCollectorsMap) {
+    if (appCollectorsMap == null || appCollectorsMap.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.appCollectorsMap = new HashMap<ApplicationId, String>();
+    this.appCollectorsMap.putAll(appCollectorsMap);
+  }
+
+  @Override
   public long getNextHeartBeatInterval() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     return (p.getNextHeartBeatInterval());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
new file mode 100644
index 0000000..c6f6619
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorsMapPBImpl;
+
+public class ReportNewCollectorInfoRequestPBImpl extends
+    ReportNewCollectorInfoRequest {
+
+  private ReportNewCollectorInfoRequestProto proto =
+      ReportNewCollectorInfoRequestProto.getDefaultInstance();
+
+  private ReportNewCollectorInfoRequestProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private List<AppCollectorsMap> collectorsList = null;
+
+  public ReportNewCollectorInfoRequestPBImpl() {
+    builder = ReportNewCollectorInfoRequestProto.newBuilder();
+  }
+
+  public ReportNewCollectorInfoRequestPBImpl(
+      ReportNewCollectorInfoRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public ReportNewCollectorInfoRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (collectorsList != null) {
+      addLocalCollectorsToProto();
+    }
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ReportNewCollectorInfoRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void addLocalCollectorsToProto() {
+    maybeInitBuilder();
+    builder.clearAppCollectors();
+    List<AppCollectorsMapProto> protoList =
+        new ArrayList<AppCollectorsMapProto>();
+    for (AppCollectorsMap m : this.collectorsList) {
+      protoList.add(convertToProtoFormat(m));
+    }
+    builder.addAllAppCollectors(protoList);
+  }
+
+  private void initLocalCollectorsList() {
+    ReportNewCollectorInfoRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppCollectorsMapProto> list =
+        p.getAppCollectorsList();
+    this.collectorsList = new ArrayList<AppCollectorsMap>();
+    for (AppCollectorsMapProto m : list) {
+      this.collectorsList.add(convertFromProtoFormat(m));
+    }
+  }
+
+  @Override
+  public List<AppCollectorsMap> getAppCollectorsList() {
+    if (this.collectorsList == null) {
+      initLocalCollectorsList();
+    }
+    return this.collectorsList;
+  }
+
+  @Override
+  public void setAppCollectorsList(List<AppCollectorsMap> appCollectorsList) {
+    maybeInitBuilder();
+    if (appCollectorsList == null) {
+      builder.clearAppCollectors();
+    }
+    this.collectorsList = appCollectorsList;
+  }
+
+  private AppCollectorsMapPBImpl convertFromProtoFormat(
+      AppCollectorsMapProto p) {
+    return new AppCollectorsMapPBImpl(p);
+  }
+
+  private AppCollectorsMapProto convertToProtoFormat(
+      AppCollectorsMap m) {
+    return ((AppCollectorsMapPBImpl) m).getProto();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
new file mode 100644
index 0000000..5f2a10a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoResponsePBImpl.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class ReportNewCollectorInfoResponsePBImpl extends
+    ReportNewCollectorInfoResponse {
+
+  private ReportNewCollectorInfoResponseProto proto =
+      ReportNewCollectorInfoResponseProto.getDefaultInstance();
+
+  private ReportNewCollectorInfoResponseProto.Builder builder = null;
+
+  private boolean viaProto = false;
+
+  public ReportNewCollectorInfoResponsePBImpl() {
+    builder = ReportNewCollectorInfoResponseProto.newBuilder();
+  }
+
+  public ReportNewCollectorInfoResponsePBImpl(
+      ReportNewCollectorInfoResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public ReportNewCollectorInfoResponseProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
new file mode 100644
index 0000000..07e1d92
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorsMap.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+
+@Private
+public abstract class AppCollectorsMap {
+
+  public static AppCollectorsMap newInstance(
+      ApplicationId id, String collectorAddr) {
+    AppCollectorsMap appCollectorsMap =
+        Records.newRecord(AppCollectorsMap.class);
+    appCollectorsMap.setApplicationId(id);
+    appCollectorsMap.setCollectorAddr(collectorAddr);
+    return appCollectorsMap;
+  }
+
+  public abstract ApplicationId getApplicationId();
+
+  public abstract void setApplicationId(ApplicationId id);
+
+  public abstract String getCollectorAddr();
+
+  public abstract void setCollectorAddr(String addr);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
new file mode 100644
index 0000000..3740035
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorsMapPBImpl.java
@@ -0,0 +1,152 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorsMapProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+@Private
+@Unstable
+public class AppCollectorsMapPBImpl extends AppCollectorsMap {
+
+  private AppCollectorsMapProto proto =
+      AppCollectorsMapProto.getDefaultInstance();
+
+  private AppCollectorsMapProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private ApplicationId appId = null;
+  private String collectorAddr = null;
+
+  public AppCollectorsMapPBImpl() {
+    builder = AppCollectorsMapProto.newBuilder();
+  }
+
+  public AppCollectorsMapPBImpl(AppCollectorsMapProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public AppCollectorsMapProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.appId == null && p.hasAppId()) {
+      this.appId = convertFromProtoFormat(p.getAppId());
+    }
+    return this.appId;
+  }
+
+  @Override
+  public String getCollectorAddr() {
+    AppCollectorsMapProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.collectorAddr == null
+        && p.hasAppCollectorAddr()) {
+      this.collectorAddr = p.getAppCollectorAddr();
+    }
+    return this.collectorAddr;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId id) {
+    maybeInitBuilder();
+    if (id == null) {
+      builder.clearAppId();
+    }
+    this.appId = id;
+  }
+
+  @Override
+  public void setCollectorAddr(String collectorAddr) {
+    maybeInitBuilder();
+    if (collectorAddr == null) {
+      builder.clearAppCollectorAddr();
+    }
+    this.collectorAddr = collectorAddr;
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = AppCollectorsMapProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.appId != null) {
+      builder.setAppId(convertToProtoFormat(this.appId));
+    }
+    if (this.collectorAddr != null) {
+      builder.setAppCollectorAddr(this.collectorAddr);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
index 0d5540d..eadb5b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/metrics/ContainerMetricsConstants.java
@@ -27,10 +27,20 @@ public class ContainerMetricsConstants {
 
   public static final String ENTITY_TYPE = "YARN_CONTAINER";
 
+  // Event of this type will be emitted by NM.
   public static final String CREATED_EVENT_TYPE = "YARN_CONTAINER_CREATED";
 
+  // Event of this type will be emitted by RM.
+  public static final String CREATED_IN_RM_EVENT_TYPE =
+      "YARN_RM_CONTAINER_CREATED";
+
+  // Event of this type will be emitted by NM.
   public static final String FINISHED_EVENT_TYPE = "YARN_CONTAINER_FINISHED";
 
+  // Event of this type will be emitted by RM.
+  public static final String FINISHED_IN_RM_EVENT_TYPE =
+      "YARN_RM_CONTAINER_FINISHED";
+
   public static final String PARENT_PRIMARIY_FILTER = "YARN_CONTAINER_PARENT";
 
   public static final String ALLOCATED_MEMORY_ENTITY_INFO =
@@ -59,4 +69,12 @@ public class ContainerMetricsConstants {
 
   public static final String ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO =
       "YARN_CONTAINER_ALLOCATED_HOST_HTTP_ADDRESS";
+
+  // Event of this type will be emitted by NM.
+  public static final String LOCALIZATION_START_EVENT_TYPE =
+      "YARN_NM_CONTAINER_LOCALIZATION_STARTED";
+
+  // Event of this type will be emitted by NM.
+  public static final String LOCALIZATION_FINISHED_EVENT_TYPE =
+      "YARN_NM_CONTAINER_LOCALIZATION_FINISHED";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
new file mode 100644
index 0000000..8665274
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/collectornodemanager_protocol.proto
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+option java_package = "org.apache.hadoop.yarn.proto";
+option java_outer_classname = "CollectorNodemanagerProtocol";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.yarn;
+
+import "yarn_server_common_service_protos.proto";
+
+service CollectorNodemanagerProtocolService {
+  rpc reportNewCollectorInfo (ReportNewCollectorInfoRequestProto) returns (ReportNewCollectorInfoResponseProto);
+  rpc getTimelineCollectorContext (GetTimelineCollectorContextRequestProto) returns (GetTimelineCollectorContextResponseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 3e3cb82..3660252 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -84,6 +84,7 @@ message NodeHeartbeatRequestProto {
   optional MasterKeyProto last_known_nm_token_master_key = 3;
   optional NodeLabelsProto nodeLabels = 4;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 5;
+  repeated AppCollectorsMapProto registered_collectors = 6;
 }
 
 message LogAggregationReportProto {
@@ -108,6 +109,7 @@ message NodeHeartbeatResponseProto {
   repeated SignalContainerRequestProto containers_to_signal = 13;
   optional ResourceProto resource = 14;
   optional ContainerQueuingLimitProto container_queuing_limit = 15;
+  repeated AppCollectorsMapProto app_collectors_map = 16;
 }
 
 message ContainerQueuingLimitProto {
@@ -120,6 +122,35 @@ message SystemCredentialsForAppsProto {
   optional bytes credentialsForApp = 2;
 }
 
+////////////////////////////////////////////////////////////////////////
+////// From collector_nodemanager_protocol ////////////////////////////
+////////////////////////////////////////////////////////////////////////
+message AppCollectorsMapProto {
+  optional ApplicationIdProto appId = 1;
+  optional string appCollectorAddr = 2;
+}
+
+//////////////////////////////////////////////////////
+/////// collector_nodemanager_protocol //////////////
+//////////////////////////////////////////////////////
+message ReportNewCollectorInfoRequestProto {
+  repeated AppCollectorsMapProto app_collectors = 1;
+}
+
+message ReportNewCollectorInfoResponseProto {
+}
+
+message GetTimelineCollectorContextRequestProto {
+  optional ApplicationIdProto appId = 1;
+}
+
+message GetTimelineCollectorContextResponseProto {
+  optional string user_id = 1;
+  optional string flow_name = 2;
+  optional string flow_version = 3;
+  optional int64 flow_run_id = 4;
+}
+
 message NMContainerStatusProto {
   optional ContainerIdProto container_id = 1;
   optional ContainerStateProto container_state = 2;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
new file mode 100644
index 0000000..e25f528
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -0,0 +1,416 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.util.Records;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRPC {
+
+  private static final String EXCEPTION_MSG = "test error";
+  private static final String EXCEPTION_CAUSE = "exception cause";
+  private static final RecordFactory RECORD_FACTORY =
+      RecordFactoryProvider.getRecordFactory(null);
+
+  public static final String ILLEGAL_NUMBER_MESSAGE =
+      "collectors' number in ReportNewCollectorInfoRequest is not ONE.";
+
+  public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
+
+  public static final ApplicationId DEFAULT_APP_ID =
+      ApplicationId.newInstance(0, 0);
+
+  @Test
+  public void testUnknownCall() {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    Server server = rpc.getServer(ContainerManagementProtocol.class,
+        new DummyContainerManager(), addr, conf, null, 1);
+    server.start();
+
+    // Any unrelated protocol would do
+    ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, NetUtils.getConnectAddress(server),
+        conf);
+
+    try {
+      proxy.getNewApplication(Records
+          .newRecord(GetNewApplicationRequest.class));
+      Assert.fail("Excepted RPC call to fail with unknown method.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().matches(
+          "Unknown method getNewApplication called on.*"
+              + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
+              + "\\$ApplicationClientProtocolService\\$BlockingInterface "
+              + "protocol."));
+    } catch (Exception e) {
+      e.printStackTrace();
+    } finally {
+      server.stop();
+    }
+  }
+
+  @Test
+  public void testRPCOnCollectorNodeManagerProtocol() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
+        .getName());
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    Server server = rpc.getServer(CollectorNodemanagerProtocol.class,
+        new DummyNMCollectorService(), addr, conf, null, 1);
+    server.start();
+
+    // Test unrelated protocol wouldn't get response
+    ApplicationClientProtocol unknownProxy =
+        (ApplicationClientProtocol) rpc.getProxy(
+        ApplicationClientProtocol.class, NetUtils.getConnectAddress(server),
+        conf);
+
+    try {
+      unknownProxy.getNewApplication(Records
+          .newRecord(GetNewApplicationRequest.class));
+      Assert.fail("Excepted RPC call to fail with unknown method.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().matches(
+          "Unknown method getNewApplication called on.*"
+              + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
+              + "\\$ApplicationClientProtocolService\\$BlockingInterface "
+              + "protocol."));
+    } catch (Exception e) {
+      e.printStackTrace();
+    }
+
+    // Test CollectorNodemanagerProtocol get proper response
+    CollectorNodemanagerProtocol proxy =
+        (CollectorNodemanagerProtocol)rpc.getProxy(
+        CollectorNodemanagerProtocol.class, NetUtils.getConnectAddress(server),
+        conf);
+    // Verify request with DEFAULT_APP_ID and DEFAULT_COLLECTOR_ADDR get
+    // normally response.
+    try {
+      ReportNewCollectorInfoRequest request =
+          ReportNewCollectorInfoRequest.newInstance(
+              DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR);
+      proxy.reportNewCollectorInfo(request);
+    } catch (YarnException e) {
+      Assert.fail("RPC call failured is not expected here.");
+    }
+
+    // Verify empty request get YarnException back (by design in
+    // DummyNMCollectorService)
+    try {
+      proxy.reportNewCollectorInfo(Records
+          .newRecord(ReportNewCollectorInfoRequest.class));
+      Assert.fail("Excepted RPC call to fail with YarnException.");
+    } catch (YarnException e) {
+      Assert.assertTrue(e.getMessage().contains(ILLEGAL_NUMBER_MESSAGE));
+    }
+
+    // Verify request with a valid app ID
+    try {
+      GetTimelineCollectorContextRequest request =
+          GetTimelineCollectorContextRequest.newInstance(
+              ApplicationId.newInstance(0, 1));
+      GetTimelineCollectorContextResponse response =
+          proxy.getTimelineCollectorContext(request);
+      Assert.assertEquals("test_user_id", response.getUserId());
+      Assert.assertEquals("test_flow_name", response.getFlowName());
+      Assert.assertEquals("test_flow_version", response.getFlowVersion());
+      Assert.assertEquals(12345678L, response.getFlowRunId());
+    } catch (YarnException | IOException e) {
+      Assert.fail("RPC call failured is not expected here.");
+    }
+
+    // Verify request with an invalid app ID
+    try {
+      GetTimelineCollectorContextRequest request =
+          GetTimelineCollectorContextRequest.newInstance(
+              ApplicationId.newInstance(0, 2));
+      proxy.getTimelineCollectorContext(request);
+      Assert.fail("RPC call failured is expected here.");
+    } catch (YarnException | IOException e) {
+      Assert.assertTrue(e instanceof  YarnException);
+      Assert.assertTrue(e.getMessage().contains(
+          "The application is not found."));
+    }
+    server.stop();
+  }
+
+  @Test
+  public void testHadoopProtoRPC() throws Exception {
+    test(HadoopYarnProtoRPC.class.getName());
+  }
+
+  private void test(String rpcClass) throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
+    YarnRPC rpc = YarnRPC.create(conf);
+    String bindAddr = "localhost:0";
+    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
+    Server server = rpc.getServer(ContainerManagementProtocol.class,
+            new DummyContainerManager(), addr, conf, null, 1);
+    server.start();
+    RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class,
+        ProtobufRpcEngine.class);
+    ContainerManagementProtocol proxy = (ContainerManagementProtocol)
+        rpc.getProxy(ContainerManagementProtocol.class,
+            NetUtils.getConnectAddress(server), conf);
+    ContainerLaunchContext containerLaunchContext =
+        RECORD_FACTORY.newRecordInstance(ContainerLaunchContext.class);
+
+    ApplicationId applicationId = ApplicationId.newInstance(0, 0);
+    ApplicationAttemptId applicationAttemptId =
+        ApplicationAttemptId.newInstance(applicationId, 0);
+    ContainerId containerId =
+        ContainerId.newContainerId(applicationAttemptId, 100);
+    NodeId nodeId = NodeId.newInstance("localhost", 1234);
+    Resource resource = Resource.newInstance(1234, 2);
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(containerId, "localhost", "user",
+          resource, System.currentTimeMillis() + 10000, 42, 42,
+          Priority.newInstance(0), 0);
+    Token containerToken = newContainerToken(nodeId, "password".getBytes(),
+          containerTokenIdentifier);
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(containerLaunchContext,
+          containerToken);
+    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    proxy.startContainers(allRequests);
+
+    List<ContainerId> containerIds = new ArrayList<ContainerId>();
+    containerIds.add(containerId);
+    GetContainerStatusesRequest gcsRequest =
+        GetContainerStatusesRequest.newInstance(containerIds);
+    GetContainerStatusesResponse response =
+        proxy.getContainerStatuses(gcsRequest);
+    List<ContainerStatus> statuses = response.getContainerStatuses();
+
+    //test remote exception
+    boolean exception = false;
+    try {
+      StopContainersRequest stopRequest =
+          RECORD_FACTORY.newRecordInstance(StopContainersRequest.class);
+      stopRequest.setContainerIds(containerIds);
+      proxy.stopContainers(stopRequest);
+    } catch (YarnException e) {
+      exception = true;
+      Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG));
+      Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE));
+      System.out.println("Test Exception is " + e.getMessage());
+    } catch (Exception ex) {
+      ex.printStackTrace();
+    } finally {
+      server.stop();
+    }
+    Assert.assertTrue(exception);
+    Assert.assertNotNull(statuses.get(0));
+    Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
+  }
+
+  public class DummyContainerManager implements ContainerManagementProtocol {
+
+    private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
+
+    @Override
+    public GetContainerStatusesResponse getContainerStatuses(
+        GetContainerStatusesRequest request)
+    throws YarnException {
+      GetContainerStatusesResponse response =
+          RECORD_FACTORY.newRecordInstance(GetContainerStatusesResponse.class);
+      response.setContainerStatuses(statuses);
+      return response;
+    }
+
+    @Override
+    public StartContainersResponse startContainers(
+        StartContainersRequest requests) throws YarnException {
+      StartContainersResponse response =
+          RECORD_FACTORY.newRecordInstance(StartContainersResponse.class);
+      for (StartContainerRequest request :
+          requests.getStartContainerRequests()) {
+        Token containerToken = request.getContainerToken();
+        ContainerTokenIdentifier tokenId = null;
+
+        try {
+          tokenId = newContainerTokenIdentifier(containerToken);
+        } catch (IOException e) {
+          throw RPCUtil.getRemoteException(e);
+        }
+        ContainerStatus status =
+            RECORD_FACTORY.newRecordInstance(ContainerStatus.class);
+        status.setState(ContainerState.RUNNING);
+        status.setContainerId(tokenId.getContainerID());
+        status.setExitStatus(0);
+        statuses.add(status);
+
+      }
+      return response;
+    }
+
+    @Override
+    public StopContainersResponse stopContainers(StopContainersRequest request)
+    throws YarnException {
+      Exception e = new Exception(EXCEPTION_MSG,
+          new Exception(EXCEPTION_CAUSE));
+      throw new YarnException(e);
+    }
+
+    @Override
+    public IncreaseContainersResourceResponse increaseContainersResource(
+        IncreaseContainersResourceRequest request)
+            throws YarnException, IOException {
+      return null;
+    }
+
+    @Override
+    public SignalContainerResponse signalToContainer(
+        SignalContainerRequest request) throws YarnException {
+      final Exception e = new Exception(EXCEPTION_MSG,
+          new Exception(EXCEPTION_CAUSE));
+      throw new YarnException(e);
+    }
+  }
+
+  public static ContainerTokenIdentifier newContainerTokenIdentifier(
+      Token containerToken) throws IOException {
+    org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
+        new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>(
+            containerToken.getIdentifier()
+                .array(), containerToken.getPassword().array(), new Text(
+                containerToken.getKind()),
+            new Text(containerToken.getService()));
+    return token.decodeIdentifier();
+  }
+
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken =
+        Token.newInstance(tokenIdentifier.getBytes(),
+          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
+
+  // A dummy implementation for CollectorNodemanagerProtocol for test purpose,
+  // it only can accept one appID, collectorAddr pair or throw exceptions
+  public class DummyNMCollectorService
+      implements CollectorNodemanagerProtocol {
+
+    @Override
+    public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+        ReportNewCollectorInfoRequest request)
+        throws YarnException, IOException {
+      List<AppCollectorsMap> appCollectors = request.getAppCollectorsList();
+      if (appCollectors.size() == 1) {
+        // check default appID and collectorAddr
+        AppCollectorsMap appCollector = appCollectors.get(0);
+        Assert.assertEquals(appCollector.getApplicationId(),
+            DEFAULT_APP_ID);
+        Assert.assertEquals(appCollector.getCollectorAddr(),
+            DEFAULT_COLLECTOR_ADDR);
+      } else {
+        throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
+      }
+
+      ReportNewCollectorInfoResponse response =
+          RECORD_FACTORY.newRecordInstance(
+              ReportNewCollectorInfoResponse.class);
+      return response;
+    }
+
+    @Override
+    public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+        GetTimelineCollectorContextRequest request)
+        throws  YarnException, IOException {
+      if (request.getApplicationId().getId() == 1) {
+        return GetTimelineCollectorContextResponse.newInstance(
+            "test_user_id", "test_flow_name", "test_flow_version", 12345678L);
+      } else {
+        throw new YarnException("The application is not found.");
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index e25547d..475ce58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -25,7 +25,9 @@ import static org.junit.Assert.assertTrue;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -107,11 +109,14 @@ public class TestYarnServerApiClasses {
     original.setLastKnownNMTokenMasterKey(getMasterKey());
     original.setNodeStatus(getNodeStatus());
     original.setNodeLabels(getValidNodeLabels());
+    Map<ApplicationId, String> collectors = getCollectors();
+    original.setRegisteredCollectors(collectors);
     NodeHeartbeatRequestPBImpl copy = new NodeHeartbeatRequestPBImpl(
         original.getProto());
     assertEquals(1, copy.getLastKnownContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getLastKnownNMTokenMasterKey().getKeyId());
     assertEquals("localhost", copy.getNodeStatus().getNodeId().getHost());
+    assertEquals(collectors, copy.getRegisteredCollectors());
     // check labels are coming with valid values
     Assert.assertTrue(original.getNodeLabels()
         .containsAll(copy.getNodeLabels()));
@@ -148,6 +153,8 @@ public class TestYarnServerApiClasses {
     original.setNextHeartBeatInterval(1000);
     original.setNodeAction(NodeAction.NORMAL);
     original.setResponseId(100);
+    Map<ApplicationId, String> collectors = getCollectors();
+    original.setAppCollectorsMap(collectors);
 
     NodeHeartbeatResponsePBImpl copy = new NodeHeartbeatResponsePBImpl(
         original.getProto());
@@ -157,6 +164,7 @@ public class TestYarnServerApiClasses {
     assertEquals(1, copy.getContainerTokenMasterKey().getKeyId());
     assertEquals(1, copy.getNMTokenMasterKey().getKeyId());
     assertEquals("testDiagnosticMessage", copy.getDiagnosticsMessage());
+    assertEquals(collectors, copy.getAppCollectorsMap());
     assertEquals(false, copy.getAreNodeLabelsAcceptedByRM());
    }
 
@@ -336,6 +344,15 @@ public class TestYarnServerApiClasses {
     return nodeLabels;
   }
 
+  private Map<ApplicationId, String> getCollectors() {
+    ApplicationId appID = ApplicationId.newInstance(1L, 1);
+    String collectorAddr = "localhost:0";
+    Map<ApplicationId, String> collectorMap =
+        new HashMap<ApplicationId, String>();
+    collectorMap.put(appID, collectorAddr);
+    return collectorMap;
+  }
+
   private ContainerStatus getContainerStatus(int applicationId,
       int containerID, int appAttemptId) {
     ContainerStatus status = recordFactory

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index cfcf1bd..131eaa3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 
 /**
@@ -71,6 +73,13 @@ public interface Context {
 
   Map<ApplicationId, Credentials> getSystemCredentialsForApps();
 
+  /**
+   * Get the registered collectors that located on this NM.
+   * @return registered collectors, or null if the timeline service v.2 is not
+   * enabled
+   */
+  Map<ApplicationId, String> getRegisteredCollectors();
+
   ConcurrentMap<ContainerId, Container> getContainers();
 
   ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
@@ -94,6 +103,8 @@ public interface Context {
 
   boolean getDecommissioned();
 
+  Configuration getConf();
+
   void setDecommissioned(boolean isDecommissioned);
 
   ConcurrentLinkedQueue<LogAggregationReport>
@@ -111,4 +122,8 @@ public interface Context {
   boolean isDistributedSchedulingEnabled();
 
   OpportunisticContainerAllocator getContainerAllocator();
+
+  void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher);
+
+  NMTimelinePublisher getNMTimelinePublisher();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index f7d226e..5bfbb8d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -60,6 +60,7 @@ import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
+import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 
@@ -98,6 +100,8 @@ public class NodeManager extends CompositeService
   private Context context;
   private AsyncDispatcher dispatcher;
   private ContainerManagerImpl containerManager;
+  // the NM collector service is set only if the timeline service v.2 is enabled
+  private NMCollectorService nmCollectorService;
   private NodeStatusUpdater nodeStatusUpdater;
   private NodeResourceMonitor nodeResourceMonitor;
   private static CompositeServiceShutdownHook nodeManagerShutdownHook;
@@ -183,6 +187,10 @@ public class NodeManager extends CompositeService
     }
   }
 
+  protected NMCollectorService createNMCollectorService(Context ctxt) {
+    return new NMCollectorService(ctxt);
+  }
+
   protected WebServer createWebServer(Context nmContext,
       ResourceView resourceView, ApplicationACLsManager aclsManager,
       LocalDirsHandlerService dirsHandler) {
@@ -196,9 +204,10 @@ public class NodeManager extends CompositeService
   protected NMContext createNMContext(
       NMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInNM nmTokenSecretManager,
-      NMStateStoreService stateStore, boolean isDistSchedulerEnabled) {
+      NMStateStoreService stateStore, boolean isDistSchedulerEnabled,
+      Configuration conf) {
     return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
-        dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled);
+        dirsHandler, aclsManager, stateStore, isDistSchedulerEnabled, conf);
   }
 
   protected void doSecureLogin() throws IOException {
@@ -331,7 +340,7 @@ public class NodeManager extends CompositeService
             YarnConfiguration.DIST_SCHEDULING_ENABLED_DEFAULT);
 
     this.context = createNMContext(containerTokenSecretManager,
-        nmTokenSecretManager, nmStore, isDistSchedulingEnabled);
+        nmTokenSecretManager, nmStore, isDistSchedulingEnabled, conf);
 
     nodeLabelsProvider = createNodeLabelsProvider(conf);
 
@@ -374,6 +383,11 @@ public class NodeManager extends CompositeService
 
     DefaultMetricsSystem.initialize("NodeManager");
 
+    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+      this.nmCollectorService = createNMCollectorService(context);
+      addService(nmCollectorService);
+    }
+
     // StatusUpdater should be added last so that it get started last 
     // so that we make sure everything is up before registering with RM. 
     addService(nodeStatusUpdater);
@@ -457,6 +471,9 @@ public class NodeManager extends CompositeService
   public static class NMContext implements Context {
 
     private NodeId nodeId = null;
+
+    private Configuration conf = null;
+
     protected final ConcurrentMap<ApplicationId, Application> applications =
         new ConcurrentHashMap<ApplicationId, Application>();
 
@@ -466,6 +483,8 @@ public class NodeManager extends CompositeService
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
 
+    private Map<ApplicationId, String> registeredCollectors;
+
     protected final ConcurrentMap<ContainerId,
         org.apache.hadoop.yarn.api.records.Container> increasedContainers =
             new ConcurrentHashMap<>();
@@ -490,10 +509,16 @@ public class NodeManager extends CompositeService
 
     private final QueuingContext queuingContext;
 
+    private NMTimelinePublisher nmTimelinePublisher;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
-        NMStateStoreService stateStore, boolean isDistSchedulingEnabled) {
+        NMStateStoreService stateStore, boolean isDistSchedulingEnabled,
+        Configuration conf) {
+      if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
+        this.registeredCollectors = new ConcurrentHashMap<>();
+      }
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
       this.dirsHandler = dirsHandler;
@@ -506,6 +531,7 @@ public class NodeManager extends CompositeService
           LogAggregationReport>();
       this.queuingContext = new QueuingNMContext();
       this.isDistSchedulingEnabled = isDistSchedulingEnabled;
+      this.conf = conf;
     }
 
     /**
@@ -527,6 +553,11 @@ public class NodeManager extends CompositeService
     }
 
     @Override
+    public Configuration getConf() {
+      return this.conf;
+    }
+
+    @Override
     public ConcurrentMap<ContainerId, Container> getContainers() {
       return this.containers;
     }
@@ -645,6 +676,31 @@ public class NodeManager extends CompositeService
     public OpportunisticContainerAllocator getContainerAllocator() {
       return containerAllocator;
     }
+
+    @Override
+    public Map<ApplicationId, String> getRegisteredCollectors() {
+      return this.registeredCollectors;
+    }
+
+    public void addRegisteredCollectors(
+        Map<ApplicationId, String> newRegisteredCollectors) {
+      if (registeredCollectors != null) {
+        this.registeredCollectors.putAll(newRegisteredCollectors);
+      } else {
+        LOG.warn("collectors are added when the registered collectors are " +
+            "initialized");
+      }
+    }
+
+    @Override
+    public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) {
+      this.nmTimelinePublisher = nmMetricsPublisher;
+    }
+
+    @Override
+    public NMTimelinePublisher getNMTimelinePublisher() {
+      return nmTimelinePublisher;
+    }
   }
 
   /**
@@ -744,9 +800,22 @@ public class NodeManager extends CompositeService
     return this.context;
   }
 
+  /**
+   * Returns the NM collector service. It should be used only for testing
+   * purposes.
+   *
+   * @return the NM collector service, or null if the timeline service v.2 is
+   * not enabled
+   */
+  @VisibleForTesting
+  NMCollectorService getNMCollectorService() {
+    return this.nmCollectorService;
+  }
+
   public static void main(String[] args) throws IOException {
     Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
+    @SuppressWarnings("resource")
     NodeManager nodeManager = new NodeManager();
     Configuration conf = new YarnConfiguration();
     new GenericOptionsParser(conf, args);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 7064fa3..e4a7903 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -81,11 +81,13 @@ import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.nodelabels.NodeLabelsProvider;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -813,7 +815,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                         .getContainerTokenSecretManager().getCurrentKey(),
                     NodeStatusUpdaterImpl.this.context
                         .getNMTokenSecretManager().getCurrentKey(),
-                    nodeLabelsForHeartbeat);
+                    nodeLabelsForHeartbeat,
+                    NodeStatusUpdaterImpl.this.context.getRegisteredCollectors());
 
             if (logAggregationEnabled) {
               // pull log aggregation status for application running in this NM
@@ -905,6 +908,10 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                     newResource.toString());
               }
             }
+            if (YarnConfiguration.timelineServiceV2Enabled(context.getConf())) {
+              updateTimelineClientsAddress(response);
+            }
+
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
             dispatcher.getEventHandler().handle(
@@ -932,6 +939,44 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         }
       }
 
+
+      private void updateTimelineClientsAddress(
+          NodeHeartbeatResponse response) {
+        Map<ApplicationId, String> knownCollectorsMap =
+            response.getAppCollectorsMap();
+        if (knownCollectorsMap == null) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("No collectors to update RM");
+          }
+        } else {
+          Set<Map.Entry<ApplicationId, String>> rmKnownCollectors =
+              knownCollectorsMap.entrySet();
+          for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
+            ApplicationId appId = entry.getKey();
+            String collectorAddr = entry.getValue();
+
+            // Only handle applications running on local node.
+            // Not include apps with timeline collectors running in local
+            Application application = context.getApplications().get(appId);
+            // TODO this logic could be problematic if the collector address
+            // gets updated due to NM restart or collector service failure
+            if (application != null &&
+                !context.getRegisteredCollectors().containsKey(appId)) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Sync a new collector address: " + collectorAddr +
+                    " for application: " + appId + " from RM.");
+              }
+              NMTimelinePublisher nmTimelinePublisher =
+                  context.getNMTimelinePublisher();
+              if (nmTimelinePublisher != null) {
+                nmTimelinePublisher.setTimelineServiceAddress(
+                    application.getAppId(), collectorAddr);
+              }
+            }
+          }
+        }
+      }
+
       private void updateMasterKeys(NodeHeartbeatResponse response) {
         // See if the master-key has rolled over
         MasterKey updatedMasterKey = response.getContainerTokenMasterKey();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
new file mode 100644
index 0000000..d667c0e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.nodemanager.collectormanager;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
+
+/**
+ * Service that handles collector information. It is used only if the timeline
+ * service v.2 is enabled.
+ */
+public class NMCollectorService extends CompositeService implements
+    CollectorNodemanagerProtocol {
+
+  private static final Log LOG = LogFactory.getLog(NMCollectorService.class);
+
+  private final Context context;
+
+  private Server server;
+
+  public NMCollectorService(Context context) {
+    super(NMCollectorService.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    Configuration conf = getConfig();
+
+    InetSocketAddress collectorServerAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+
+    Configuration serverConf = new Configuration(conf);
+
+    // TODO Security settings.
+    YarnRPC rpc = YarnRPC.create(conf);
+
+    server =
+        rpc.getServer(CollectorNodemanagerProtocol.class, this,
+            collectorServerAddress, serverConf,
+            this.context.getNMTokenSecretManager(),
+            conf.getInt(YarnConfiguration.NM_COLLECTOR_SERVICE_THREAD_COUNT,
+                YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT));
+
+    server.start();
+    collectorServerAddress = conf.updateConnectAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+        server.getListenerAddress());
+    // start remaining services
+    super.serviceStart();
+    LOG.info("NMCollectorService started at " + collectorServerAddress);
+  }
+
+
+  @Override
+  public void serviceStop() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+    // TODO may cleanup app collectors running on this NM in future.
+    super.serviceStop();
+  }
+
+  @Override
+  public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+      ReportNewCollectorInfoRequest request) throws YarnException, IOException {
+    List<AppCollectorsMap> newCollectorsList = request.getAppCollectorsList();
+    if (newCollectorsList != null && !newCollectorsList.isEmpty()) {
+      Map<ApplicationId, String> newCollectorsMap =
+          new HashMap<ApplicationId, String>();
+      for (AppCollectorsMap collector : newCollectorsList) {
+        ApplicationId appId = collector.getApplicationId();
+        String collectorAddr = collector.getCollectorAddr();
+        newCollectorsMap.put(appId, collectorAddr);
+        // set registered collector address to TimelineClient.
+        NMTimelinePublisher nmTimelinePublisher =
+            context.getNMTimelinePublisher();
+        if (nmTimelinePublisher != null) {
+          nmTimelinePublisher.setTimelineServiceAddress(appId, collectorAddr);
+        }
+      }
+      ((NodeManager.NMContext)context).addRegisteredCollectors(
+          newCollectorsMap);
+    }
+
+    return ReportNewCollectorInfoResponse.newInstance();
+  }
+
+  @Override
+  public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+      GetTimelineCollectorContextRequest request)
+      throws YarnException, IOException {
+    Application app = context.getApplications().get(request.getApplicationId());
+    if (app == null) {
+      throw new YarnException("Application " + request.getApplicationId() +
+          " doesn't exist on NM.");
+    }
+    return GetTimelineCollectorContextResponse.newInstance(
+        app.getUser(), app.getFlowName(), app.getFlowVersion(),
+        app.getFlowRunId());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/60a79b8e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/package-info.java
new file mode 100644
index 0000000..7bf597b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/package-info.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.server.nodemanager.collectormanager contains
+ * classes for handling timeline collector information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.yarn.server.nodemanager.collectormanager;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message