hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rohithsharm...@apache.org
Subject [2/2] hadoop git commit: YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.
Date Fri, 11 Aug 2017 07:09:48 GMT
YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/79806939
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/79806939
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/79806939

Branch: refs/heads/YARN-5355_branch2
Commit: 798069390739ab6971ca038aad4cd0adc4b9855a
Parents: e2ffa0f
Author: Rohith Sharma K S <rohithsharmaks@apache.org>
Authored: Fri Aug 11 12:35:35 2017 +0530
Committer: Rohith Sharma K S <rohithsharmaks@apache.org>
Committed: Fri Aug 11 12:35:35 2017 +0530

----------------------------------------------------------------------
 .../v2/app/rm/RMContainerAllocator.java         |   9 +-
 .../app/local/TestLocalContainerAllocator.java  |   2 +-
 .../api/protocolrecords/AllocateResponse.java   |  92 +++++++++---
 .../hadoop/yarn/api/records/CollectorInfo.java  |  55 +++++++
 .../src/main/proto/yarn_protos.proto            |   5 +
 .../src/main/proto/yarn_service_protos.proto    |   2 +-
 .../api/async/impl/AMRMClientAsyncImpl.java     |   6 +-
 .../ApplicationMasterServiceProtoTestBase.java  |  72 +++++++++
 .../hadoop/yarn/client/ProtocolHATestBase.java  |  22 ++-
 ...ationMasterServiceProtocolForTimelineV2.java |  71 +++++++++
 ...estApplicationMasterServiceProtocolOnHA.java |  46 +-----
 .../api/async/impl/TestAMRMClientAsync.java     |   4 +-
 .../impl/pb/AllocateResponsePBImpl.java         |  37 ++++-
 .../records/impl/pb/CollectorInfoPBImpl.java    | 148 +++++++++++++++++++
 .../hadoop/yarn/api/TestPBImplRecords.java      |   2 +
 .../ReportNewCollectorInfoRequest.java          |   5 +-
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  25 +++-
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  21 ++-
 .../pb/ReportNewCollectorInfoRequestPBImpl.java |   4 +-
 .../server/api/records/AppCollectorData.java    |  27 +++-
 .../records/impl/pb/AppCollectorDataPBImpl.java |  29 +++-
 .../yarn_server_common_service_protos.proto     |   2 +
 .../java/org/apache/hadoop/yarn/TestRPC.java    |  30 +++-
 .../hadoop/yarn/TestYarnServerApiClasses.java   |   4 +-
 .../nodemanager/NodeStatusUpdaterImpl.java      |   1 -
 .../application/ApplicationImpl.java            |   2 +-
 .../amrmproxy/MockResourceManagerFacade.java    |   2 +-
 .../ApplicationMasterService.java               |  10 +-
 .../server/resourcemanager/rmapp/RMApp.java     |  15 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |  10 +-
 .../applicationsmanager/MockAsm.java            |   6 +
 .../server/resourcemanager/rmapp/MockRMApp.java |   7 +-
 .../TestTimelineServiceClientIntegration.java   |   2 +-
 .../security/TestTimelineAuthFilterForV2.java   | 121 +++++++++++----
 .../collector/AppLevelTimelineCollector.java    |  24 +++
 .../AppLevelTimelineCollectorWithAgg.java       |   4 +-
 .../collector/NodeTimelineCollectorManager.java |  83 +++++++++--
 .../PerNodeTimelineCollectorsAuxService.java    |   7 +-
 ...neV2DelegationTokenSecretManagerService.java |  31 ++++
 .../TestNMTimelineCollectorManager.java         |   4 +-
 40 files changed, 887 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 218e218..d681940 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -860,13 +860,16 @@ public class RMContainerAllocator extends RMContainerRequestor
     handleUpdatedNodes(response);
     handleJobPriorityChange(response);
     // handle receiving the timeline collector address for this app
-    String collectorAddr = response.getCollectorAddr();
+    String collectorAddr = null;
+    if (response.getCollectorInfo() != null) {
+      collectorAddr = response.getCollectorInfo().getCollectorAddr();
+    }
+
     MRAppMaster.RunningAppContext appContext =
         (MRAppMaster.RunningAppContext)this.getContext();
     if (collectorAddr != null && !collectorAddr.isEmpty()
         && appContext.getTimelineV2Client() != null) {
-      appContext.getTimelineV2Client().setTimelineServiceAddress(
-          response.getCollectorAddr());
+      appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr);
     }
 
     for (ContainerStatus cont : finishedContainers) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
index 3fa0043..9549b68 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/local/TestLocalContainerAllocator.java
@@ -297,7 +297,7 @@ public class TestLocalContainerAllocator {
           Resources.none(), null, 1, null,
           Collections.<NMToken>emptyList(),
           yarnToken,
-          Collections.<UpdatedContainer>emptyList());
+          Collections.<UpdatedContainer>emptyList(), null);
       response.setApplicationPriority(Priority.newInstance(0));
       return response;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index c369c3c..b2b40a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
@@ -96,7 +97,8 @@ public abstract class AllocateResponse {
 
   /**
    * Use {@link AllocateResponse#newInstance(int, List, List, List, Resource,
-   * AMCommand, int, PreemptionMessage, List, Token, List)} instead
+   * AMCommand, int, PreemptionMessage, List, Token, List, CollectorInfo)}
+   * instead.
    * @param responseId responseId
    * @param completedContainers completedContainers
    * @param allocatedContainers allocatedContainers
@@ -117,10 +119,14 @@ public abstract class AllocateResponse {
       Resource availResources, AMCommand command, int numClusterNodes,
       PreemptionMessage preempt, List<NMToken> nmTokens,
       List<ContainerResourceIncrease> increasedContainers,
-      List<ContainerResourceDecrease> decreasedContainers) {
-    return newInstance(responseId, completedContainers, allocatedContainers,
-        updatedNodes, availResources, command, numClusterNodes, preempt,
-        nmTokens);
+      List<ContainerResourceDecrease> decreasedContainers,
+      CollectorInfo collectorInfo) {
+    return AllocateResponse.newBuilder().responseId(responseId)
+        .completedContainersStatuses(completedContainers)
+        .allocatedContainers(allocatedContainers)
+        .updatedNodes(updatedNodes).availableResources(availResources)
+        .amCommand(command).nmTokens(nmTokens).collectorInfo(collectorInfo)
+        .build();
   }
 
   @Public
@@ -147,14 +153,15 @@ public abstract class AllocateResponse {
       List<Container> allocatedContainers, List<NodeReport> updatedNodes,
       Resource availResources, AMCommand command, int numClusterNodes,
       PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
-      List<UpdatedContainer> updatedContainers) {
+      List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo) {
     return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
         .responseId(responseId)
         .completedContainersStatuses(completedContainers)
         .allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
         .availableResources(availResources).amCommand(command)
         .preemptionMessage(preempt).nmTokens(nmTokens)
-        .updatedContainers(updatedContainers).amRmToken(amRMToken).build();
+        .updatedContainers(updatedContainers).amRmToken(amRMToken)
+        .collectorInfo(collectorInfo).build();
   }
 
   /**
@@ -346,6 +353,20 @@ public abstract class AllocateResponse {
   public abstract void setApplicationPriority(Priority priority);
 
   /**
+   * The data associated with the collector that belongs to this app. Contains
+   * address and token alongwith identification information.
+   *
+   * @return The data of collector that belong to this attempt
+   */
+  @Public
+  @Unstable
+  public abstract CollectorInfo getCollectorInfo();
+
+  @Private
+  @Unstable
+  public abstract void setCollectorInfo(CollectorInfo info);
+
+  /**
    * Get the list of container update errors to inform the
    * Application Master about the container updates that could not be
    * satisfied due to error.
@@ -544,6 +565,50 @@ public abstract class AllocateResponse {
     }
 
     /**
+     * Set the <code>applicationPriority</code> of the response.
+     * @see AllocateResponse#setApplicationPriority(Priority)
+     * @param applicationPriority
+     *     <code>applicationPriority</code> of the response
+     * @return {@link AllocateResponseBuilder}
+     */
+    @Private
+    @Unstable
+    public AllocateResponseBuilder applicationPriority(
+        Priority applicationPriority) {
+      allocateResponse.setApplicationPriority(applicationPriority);
+      return this;
+    }
+
+    /**
+     * Set the <code>collectorInfo</code> of the response.
+     * @see AllocateResponse#setCollectorInfo(CollectorInfo)
+     * @param collectorInfo <code>collectorInfo</code> of the response which
+     *    contains collector address, RM id, version and collector token.
+     * @return {@link AllocateResponseBuilder}
+     */
+    @Private
+    @Unstable
+    public AllocateResponseBuilder collectorInfo(
+        CollectorInfo collectorInfo) {
+      allocateResponse.setCollectorInfo(collectorInfo);
+      return this;
+    }
+
+    /**
+     * Set the <code>updateErrors</code> of the response.
+     * @see AllocateResponse#setUpdateErrors(List)
+     * @param updateErrors <code>updateErrors</code> of the response
+     * @return {@link AllocateResponseBuilder}
+     */
+    @Private
+    @Unstable
+    public AllocateResponseBuilder updateErrors(
+        List<UpdateContainerError> updateErrors) {
+      allocateResponse.setUpdateErrors(updateErrors);
+      return this;
+    }
+
+    /**
      * Return generated {@link AllocateResponse} object.
      * @return {@link AllocateResponse}
      */
@@ -568,17 +633,4 @@ public abstract class AllocateResponse {
   @Deprecated
   public abstract List<ContainerResourceDecrease> getDecreasedContainers();
 
-  /**
-   * The address of collector that belong to this app
-   *
-   * @return The address of collector that belong to this attempt
-   */
-  @Public
-  @Unstable
-  public abstract String getCollectorAddr();
-
-  @Private
-  @Unstable
-  public abstract void setCollectorAddr(String collectorAddr);
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
new file mode 100644
index 0000000..960c992
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/CollectorInfo.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Collector info containing collector address and collector token passed from
+ * RM to AM in Allocate Response.
+ */
+@Private
+@InterfaceStability.Unstable
+public abstract class CollectorInfo {
+
+  protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
+
+  public static CollectorInfo newInstance(String collectorAddr, Token token) {
+    CollectorInfo amCollectorInfo =
+        Records.newRecord(CollectorInfo.class);
+    amCollectorInfo.setCollectorAddr(collectorAddr);
+    amCollectorInfo.setCollectorToken(token);
+    return amCollectorInfo;
+  }
+
+  public abstract String getCollectorAddr();
+
+  public abstract void setCollectorAddr(String addr);
+
+  /**
+   * Get delegation token for app collector which AM will use to publish
+   * entities.
+   * @return the delegation token for app collector.
+   */
+  public abstract Token getCollectorToken();
+
+  public abstract void setCollectorToken(Token token);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
index 4af5a97..59d9141 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto
@@ -635,3 +635,8 @@ message ContainerResourceDecreaseProto {
   optional ContainerIdProto container_id = 1;
   optional ResourceProto capability = 2;
 }
+
+message CollectorInfoProto {
+  optional string collector_addr = 1;
+  optional hadoop.common.TokenProto collector_token = 2;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 2a668bd..a212a7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -114,7 +114,7 @@ message AllocateResponseProto {
   repeated ContainerResourceDecreaseProto decreased_containers = 11;
   optional hadoop.common.TokenProto am_rm_token = 12;
   optional PriorityProto application_priority = 13;
-  optional string collector_addr = 14;
+  optional CollectorInfoProto collector_info = 14;
   repeated UpdateContainerErrorProto update_errors = 15;
   repeated UpdatedContainerProto updated_containers = 16;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index 8b2557c..eb94b28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -327,7 +327,11 @@ extends AMRMClientAsync<T> {
 
           AllocateResponse response = (AllocateResponse) object;
 
-          String collectorAddress = response.getCollectorAddr();
+          String collectorAddress = null;
+          if (response.getCollectorInfo() != null) {
+            collectorAddress = response.getCollectorInfo().getCollectorAddr();
+          }
+
           TimelineV2Client timelineClient =
               client.getRegisteredTimelineV2Client();
           if (timelineClient != null && collectorAddress != null

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
new file mode 100644
index 0000000..4521018
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ApplicationMasterServiceProtoTestBase.java
@@ -0,0 +1,72 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
+import org.junit.After;
+
+/**
+ * Test Base for Application Master Service Protocol.
+ */
+public abstract class ApplicationMasterServiceProtoTestBase
+    extends ProtocolHATestBase {
+
+  private ApplicationMasterProtocol amClient;
+  private ApplicationAttemptId attemptId;
+
+  protected void startupHAAndSetupClient() throws Exception {
+    attemptId = this.cluster.createFakeApplicationAttemptId();
+
+    Token<AMRMTokenIdentifier> appToken =
+        this.cluster.getResourceManager().getRMContext()
+          .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
+    appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
+    UserGroupInformation.setLoginUser(UserGroupInformation
+        .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
+    UserGroupInformation.getCurrentUser().addToken(appToken);
+    syncToken(appToken);
+    amClient = ClientRMProxy
+        .createRMProxy(this.conf, ApplicationMasterProtocol.class);
+  }
+
+  @After
+  public void shutDown() {
+    if(this.amClient != null) {
+      RPC.stopProxy(this.amClient);
+    }
+  }
+
+  protected ApplicationMasterProtocol getAMClient() {
+    return amClient;
+  }
+
+  private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
+    for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
+      this.cluster.getResourceManager(i).getRMContext()
+          .getAMRMTokenSecretManager().addPersistedPassword(token);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
index a8e9132..efb1987 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/ProtocolHATestBase.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -98,6 +99,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -804,11 +806,21 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
     }
 
     public AllocateResponse createFakeAllocateResponse() {
-      return AllocateResponse.newInstance(-1,
-          new ArrayList<ContainerStatus>(),
-          new ArrayList<Container>(), new ArrayList<NodeReport>(),
-          Resource.newInstance(1024, 2), null, 1,
-          null, new ArrayList<NMToken>());
+      if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
+        return AllocateResponse.newInstance(-1,
+            new ArrayList<ContainerStatus>(), new ArrayList<Container>(),
+            new ArrayList<NodeReport>(), Resource.newInstance(1024, 2), null, 1,
+            null, new ArrayList<NMToken>(), null,
+            new ArrayList<UpdatedContainer>(),
+            CollectorInfo.newInstance("host:port", Token.newInstance(
+            new byte[] {0}, "TIMELINE", new byte[] {0}, "rm")));
+      } else {
+        return AllocateResponse.newInstance(-1,
+            new ArrayList<ContainerStatus>(),
+            new ArrayList<Container>(), new ArrayList<NodeReport>(),
+            Resource.newInstance(1024, 2), null, 1,
+            null, new ArrayList<NMToken>());
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
new file mode 100644
index 0000000..be8c302
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolForTimelineV2.java
@@ -0,0 +1,71 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests Application Master Protocol with timeline service v2 enabled.
+ */
+public class TestApplicationMasterServiceProtocolForTimelineV2
+    extends ApplicationMasterServiceProtoTestBase {
+
+  @Before
+  public void initialize() throws Exception {
+    HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf);
+    HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE + 200, conf);
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
+    conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class, TimelineWriter.class);
+    startHACluster(0, false, false, true);
+    super.startupHAAndSetupClient();
+  }
+
+  @Test(timeout = 15000)
+  public void testAllocateForTimelineV2OnHA()
+      throws YarnException, IOException {
+    AllocateRequest request = AllocateRequest.newInstance(0, 50f,
+        new ArrayList<ResourceRequest>(),
+        new ArrayList<ContainerId>(),
+        ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
+            new ArrayList<String>()));
+    AllocateResponse response = getAMClient().allocate(request);
+    Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
+    Assert.assertNotNull(response.getCollectorInfo());
+    Assert.assertEquals("host:port",
+        response.getCollectorInfo().getCollectorAddr());
+    Assert.assertNotNull(response.getCollectorInfo().getCollectorToken());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
index ad86fb3..c2f39a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestApplicationMasterServiceProtocolOnHA.java
@@ -23,10 +23,6 @@ import java.util.ArrayList;
 
 import org.junit.Assert;
 
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
@@ -34,45 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
 
 public class TestApplicationMasterServiceProtocolOnHA
-    extends ProtocolHATestBase {
-  private ApplicationMasterProtocol amClient;
-  private ApplicationAttemptId attemptId ;
-
+    extends ApplicationMasterServiceProtoTestBase {
   @Before
   public void initialize() throws Exception {
     startHACluster(0, false, false, true);
-    attemptId = this.cluster.createFakeApplicationAttemptId();
-
-    Token<AMRMTokenIdentifier> appToken =
-        this.cluster.getResourceManager().getRMContext()
-          .getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
-    appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
-    UserGroupInformation.setLoginUser(UserGroupInformation
-        .createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
-    UserGroupInformation.getCurrentUser().addToken(appToken);
-    syncToken(appToken);
-
-    amClient = ClientRMProxy
-        .createRMProxy(this.conf, ApplicationMasterProtocol.class);
-  }
-
-  @After
-  public void shutDown() {
-    if(this.amClient != null) {
-      RPC.stopProxy(this.amClient);
-    }
+    super.startupHAAndSetupClient();
   }
 
   @Test(timeout = 15000)
@@ -81,7 +52,7 @@ public class TestApplicationMasterServiceProtocolOnHA
     RegisterApplicationMasterRequest request =
         RegisterApplicationMasterRequest.newInstance("localhost", 0, "");
     RegisterApplicationMasterResponse response =
-        amClient.registerApplicationMaster(request);
+        getAMClient().registerApplicationMaster(request);
     Assert.assertEquals(response,
         this.cluster.createFakeRegisterApplicationMasterResponse());
   }
@@ -93,7 +64,7 @@ public class TestApplicationMasterServiceProtocolOnHA
         FinishApplicationMasterRequest.newInstance(
             FinalApplicationStatus.SUCCEEDED, "", "");
     FinishApplicationMasterResponse response =
-        amClient.finishApplicationMaster(request);
+        getAMClient().finishApplicationMaster(request);
     Assert.assertEquals(response,
         this.cluster.createFakeFinishApplicationMasterResponse());
   }
@@ -105,14 +76,7 @@ public class TestApplicationMasterServiceProtocolOnHA
         new ArrayList<ContainerId>(),
         ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
             new ArrayList<String>()));
-    AllocateResponse response = amClient.allocate(request);
+    AllocateResponse response = getAMClient().allocate(request);
     Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
   }
-
-  private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
-    for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
-      this.cluster.getResourceManager(i).getRMContext()
-          .getAMRMTokenSecretManager().addPersistedPassword(token);
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
index ba38340..9c64412 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestAMRMClientAsync.java
@@ -426,8 +426,8 @@ public class TestAMRMClientAsync {
     }
     AllocateResponse response =
         AllocateResponse.newInstance(0, completed, allocated,
-            new ArrayList<NodeReport>(), null, null, 1, null, nmTokens,
-            updatedContainers);
+            new ArrayList<NodeReport>(), null, null, 1, null, nmTokens, null,
+            updatedContainers, null);
     return response;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index bd82016..61d16e9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerResourceDecrease;
@@ -40,6 +41,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.UpdateContainerError;
 import org.apache.hadoop.yarn.api.records.UpdatedContainer;
+import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
@@ -50,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@@ -82,6 +85,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   private PreemptionMessage preempt;
   private Token amrmToken = null;
   private Priority appPriority = null;
+  private CollectorInfo collectorInfo = null;
 
   public AllocateResponsePBImpl() {
     builder = AllocateResponseProto.newBuilder();
@@ -164,6 +168,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     if (this.amrmToken != null) {
       builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
     }
+    if (this.collectorInfo != null) {
+      builder.setCollectorInfo(convertToProtoFormat(this.collectorInfo));
+    }
     if (this.appPriority != null) {
       builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
     }
@@ -410,19 +417,25 @@ public class AllocateResponsePBImpl extends AllocateResponse {
 
 
   @Override
-  public synchronized String getCollectorAddr() {
+  public synchronized CollectorInfo getCollectorInfo() {
     AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
-    return p.getCollectorAddr();
+    if (this.collectorInfo != null) {
+      return this.collectorInfo;
+    }
+    if (!p.hasCollectorInfo()) {
+      return null;
+    }
+    this.collectorInfo = convertFromProtoFormat(p.getCollectorInfo());
+    return this.collectorInfo;
   }
 
   @Override
-  public synchronized void setCollectorAddr(String collectorAddr) {
+  public synchronized void setCollectorInfo(CollectorInfo info) {
     maybeInitBuilder();
-    if (collectorAddr == null) {
-      builder.clearCollectorAddr();
-      return;
+    if (info == null) {
+      builder.clearCollectorInfo();
     }
-    builder.setCollectorAddr(collectorAddr);
+    this.collectorInfo = info;
   }
 
   @Override
@@ -730,6 +743,16 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     return ((NodeReportPBImpl)t).getProto();
   }
 
+  private synchronized CollectorInfoPBImpl convertFromProtoFormat(
+      CollectorInfoProto p) {
+    return new CollectorInfoPBImpl(p);
+  }
+
+  private synchronized CollectorInfoProto convertToProtoFormat(
+      CollectorInfo t) {
+    return ((CollectorInfoPBImpl)t).getProto();
+  }
+
   private synchronized ContainerPBImpl convertFromProtoFormat(
       ContainerProto p) {
     return new ContainerPBImpl(p);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
new file mode 100644
index 0000000..bb54133
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/CollectorInfoPBImpl.java
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProtoOrBuilder;
+
+import com.google.protobuf.TextFormat;
+
+/**
+ * Protocol record implementation of {@link CollectorInfo}.
+ */
+public class CollectorInfoPBImpl extends CollectorInfo {
+
+  private CollectorInfoProto proto = CollectorInfoProto.getDefaultInstance();
+
+  private CollectorInfoProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private String collectorAddr = null;
+  private Token collectorToken = null;
+
+
+  public CollectorInfoPBImpl() {
+    builder = CollectorInfoProto.newBuilder();
+  }
+
+  public CollectorInfoPBImpl(CollectorInfoProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public CollectorInfoProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = CollectorInfoProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  @Override
+  public String getCollectorAddr() {
+    CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.collectorAddr == null && p.hasCollectorAddr()) {
+      this.collectorAddr = p.getCollectorAddr();
+    }
+    return this.collectorAddr;
+  }
+
+  @Override
+  public void setCollectorAddr(String addr) {
+    maybeInitBuilder();
+    if (collectorAddr == null) {
+      builder.clearCollectorAddr();
+    }
+    this.collectorAddr = addr;
+  }
+
+  @Override
+  public Token getCollectorToken() {
+    CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.collectorToken == null && p.hasCollectorToken()) {
+      this.collectorToken = convertFromProtoFormat(p.getCollectorToken());
+    }
+    return this.collectorToken;
+  }
+
+  @Override
+  public void setCollectorToken(Token token) {
+    maybeInitBuilder();
+    if (token == null) {
+      builder.clearCollectorToken();
+    }
+    this.collectorToken = token;
+  }
+
+  private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+    return new TokenPBImpl(p);
+  }
+
+  private TokenProto convertToProtoFormat(Token t) {
+    return ((TokenPBImpl) t).getProto();
+  }
+
+  private void mergeLocalToBuilder() {
+    if (this.collectorAddr != null) {
+      builder.setCollectorAddr(this.collectorAddr);
+    }
+    if (this.collectorToken != null) {
+      builder.setCollectorToken(convertToProtoFormat(this.collectorToken));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
index 932e078..bc657ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java
@@ -100,6 +100,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestP
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -405,6 +406,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
     generateByNewInstance(ApplicationTimeout.class);
     generateByNewInstance(ContainerResourceIncreaseRequest.class);
     generateByNewInstance(QueueConfigurations.class);
+    generateByNewInstance(CollectorInfo.class);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
index 1503eca..a4c1a38 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
@@ -22,6 +22,7 @@ import java.util.Arrays;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.util.Records;
 
@@ -37,11 +38,11 @@ public abstract class ReportNewCollectorInfoRequest {
   }
 
   public static ReportNewCollectorInfoRequest newInstance(
-      ApplicationId id, String collectorAddr) {
+      ApplicationId id, String collectorAddr, Token token) {
     ReportNewCollectorInfoRequest request =
         Records.newRecord(ReportNewCollectorInfoRequest.class);
     request.setAppCollectorsList(
-        Arrays.asList(AppCollectorData.newInstance(id, collectorAddr)));
+        Arrays.asList(AppCollectorData.newInstance(id, collectorAddr, token)));
     return request;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 73a8abe..c07a6eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -26,23 +26,26 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
@@ -164,9 +167,13 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     builder.clearRegisteringCollectors();
     for (Map.Entry<ApplicationId, AppCollectorData> entry :
         registeringCollectors.entrySet()) {
+      AppCollectorData data = entry.getValue();
       builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
           .setAppId(convertToProtoFormat(entry.getKey()))
-          .setAppCollectorAddr(entry.getValue().getCollectorAddr()));
+          .setAppCollectorAddr(data.getCollectorAddr())
+          .setAppCollectorToken(convertToProtoFormat(data.getCollectorToken()))
+          .setRmIdentifier(data.getRMIdentifier())
+          .setVersion(data.getVersion()));
     }
   }
 
@@ -267,8 +274,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       this.registeringCollectors = new HashMap<>();
       for (AppCollectorDataProto c : list) {
         ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
         AppCollectorData data = AppCollectorData.newInstance(appId,
-            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
+            collectorToken);
         this.registeringCollectors.put(appId, data);
       }
     }
@@ -309,6 +318,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
     return ((MasterKeyPBImpl)t).getProto();
   }
 
+  private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+    return new TokenPBImpl(p);
+  }
+
+  private TokenProto convertToProtoFormat(Token t) {
+    return ((TokenPBImpl) t).getProto();
+  }
+
   @Override
   public Set<NodeLabel> getNodeLabels() {
     initNodeLabels();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index bc4e802..3f4b4ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -26,32 +26,35 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
-import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -153,6 +156,8 @@ public class NodeHeartbeatResponsePBImpl extends
       builder.addAppCollectors(AppCollectorDataProto.newBuilder()
           .setAppId(convertToProtoFormat(entry.getKey()))
           .setAppCollectorAddr(data.getCollectorAddr())
+          .setAppCollectorToken(
+              convertToProtoFormat(entry.getValue().getCollectorToken()))
           .setRmIdentifier(data.getRMIdentifier())
           .setVersion(data.getVersion()));
     }
@@ -598,8 +603,10 @@ public class NodeHeartbeatResponsePBImpl extends
       this.appCollectorsMap = new HashMap<>();
       for (AppCollectorDataProto c : list) {
         ApplicationId appId = convertFromProtoFormat(c.getAppId());
+        Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
         AppCollectorData data = AppCollectorData.newInstance(appId,
-            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
+            c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
+            collectorToken);
         this.appCollectorsMap.put(appId, data);
       }
     }
@@ -779,5 +786,13 @@ public class NodeHeartbeatResponsePBImpl extends
       SignalContainerRequest t) {
     return ((SignalContainerRequestPBImpl)t).getProto();
   }
+
+  private TokenProto convertToProtoFormat(Token t) {
+    return ((TokenPBImpl) t).getProto();
+  }
+
+  private TokenPBImpl convertFromProtoFormat(TokenProto p) {
+    return new TokenPBImpl(p);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
index 3f3dcf5..5ffc3a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/ReportNewCollectorInfoRequestPBImpl.java
@@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
-import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
 
 public class ReportNewCollectorInfoRequestPBImpl extends
     ReportNewCollectorInfoRequest {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
index da2e5de..5266dca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/AppCollectorData.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.records;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.util.Records;
 
 
@@ -31,20 +32,32 @@ public abstract class AppCollectorData {
   protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
 
   public static AppCollectorData newInstance(
-      ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
+      ApplicationId id, String collectorAddr, long rmIdentifier, long version,
+      Token token) {
     AppCollectorData appCollectorData =
         Records.newRecord(AppCollectorData.class);
     appCollectorData.setApplicationId(id);
     appCollectorData.setCollectorAddr(collectorAddr);
     appCollectorData.setRMIdentifier(rmIdentifier);
     appCollectorData.setVersion(version);
+    appCollectorData.setCollectorToken(token);
     return appCollectorData;
   }
 
+  public static AppCollectorData newInstance(
+      ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
+    return newInstance(id, collectorAddr, rmIdentifier, version, null);
+  }
+
   public static AppCollectorData newInstance(ApplicationId id,
-      String collectorAddr) {
+      String collectorAddr, Token token) {
     return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
-        DEFAULT_TIMESTAMP_VALUE);
+        DEFAULT_TIMESTAMP_VALUE, token);
+  }
+
+  public static AppCollectorData newInstance(ApplicationId id,
+      String collectorAddr) {
+    return newInstance(id, collectorAddr, null);
   }
 
   /**
@@ -101,4 +114,12 @@ public abstract class AppCollectorData {
 
   public abstract void setVersion(long version);
 
+  /**
+   * Get delegation token for app collector which AM will use to publish
+   * entities.
+   * @return the delegation token for app collector.
+   */
+  public abstract Token getCollectorToken();
+
+  public abstract void setCollectorToken(Token token);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
index 7d3a805..7144f51 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/AppCollectorDataPBImpl.java
@@ -19,10 +19,11 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
-
+import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
@@ -43,6 +44,7 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
   private String collectorAddr = null;
   private Long rmIdentifier = null;
   private Long version = null;
+  private Token collectorToken = null;
 
   public AppCollectorDataPBImpl() {
     builder = AppCollectorDataProto.newBuilder();
@@ -158,6 +160,24 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
     builder.setVersion(version);
   }
 
+  @Override
+  public Token getCollectorToken() {
+    AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.collectorToken == null && p.hasAppCollectorToken()) {
+      this.collectorToken = new TokenPBImpl(p.getAppCollectorToken());
+    }
+    return this.collectorToken;
+  }
+
+  @Override
+  public void setCollectorToken(Token token) {
+    maybeInitBuilder();
+    if (token == null) {
+      builder.clearAppCollectorToken();
+    }
+    this.collectorToken = token;
+  }
+
   private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
     return new ApplicationIdPBImpl(p);
   }
@@ -195,6 +215,9 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
     if (this.version != null) {
       builder.setVersion(this.version);
     }
+    if (this.collectorToken != null) {
+      builder.setAppCollectorToken(
+          ((TokenPBImpl)this.collectorToken).getProto());
+    }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index 7dbae4e..02bb76f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -22,6 +22,7 @@ option java_generic_services = true;
 option java_generate_equals_and_hash = true;
 package hadoop.yarn;
 
+import "Security.proto";
 import "yarn_protos.proto";
 import "yarn_server_common_protos.proto";
 import "yarn_service_protos.proto";
@@ -136,6 +137,7 @@ message AppCollectorDataProto {
   optional string app_collector_addr = 2;
   optional int64 rm_identifier = 3 [default = -1];
   optional int64 version = 4 [default = -1];
+  optional hadoop.common.TokenProto app_collector_token = 5;
 }
 
 //////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
index 7eb8944..45d2cb2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
@@ -91,6 +93,21 @@ public class TestRPC {
       "collectors' number in ReportNewCollectorInfoRequest is not ONE.";
 
   public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
+  private static final Token DEFAULT_COLLECTOR_TOKEN;
+  static {
+    TimelineDelegationTokenIdentifier identifier =
+        new TimelineDelegationTokenIdentifier();
+    identifier.setOwner(new Text("user"));
+    identifier.setRenewer(new Text("user"));
+    identifier.setRealUser(new Text("user"));
+    long now = Time.now();
+    identifier.setIssueDate(now);
+    identifier.setMaxDate(now + 1000L);
+    identifier.setMasterKeyId(500);
+    identifier.setSequenceNumber(5);
+    DEFAULT_COLLECTOR_TOKEN = Token.newInstance(identifier.getBytes(),
+        identifier.getKind().toString(), identifier.getBytes(), "localhost:0");
+  }
 
   public static final ApplicationId DEFAULT_APP_ID =
       ApplicationId.newInstance(0, 0);
@@ -171,7 +188,16 @@ public class TestRPC {
     try {
       ReportNewCollectorInfoRequest request =
           ReportNewCollectorInfoRequest.newInstance(
-              DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR);
+              DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, null);
+      proxy.reportNewCollectorInfo(request);
+    } catch (YarnException e) {
+      Assert.fail("RPC call failured is not expected here.");
+    }
+
+    try {
+      ReportNewCollectorInfoRequest request =
+          ReportNewCollectorInfoRequest.newInstance(
+              DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TOKEN);
       proxy.reportNewCollectorInfo(request);
     } catch (YarnException e) {
       Assert.fail("RPC call failured is not expected here.");
@@ -428,6 +454,8 @@ public class TestRPC {
             DEFAULT_APP_ID);
         Assert.assertEquals(appCollector.getCollectorAddr(),
             DEFAULT_COLLECTOR_ADDR);
+        Assert.assertTrue(appCollector.getCollectorToken() == null ||
+            appCollector.getCollectorToken().equals(DEFAULT_COLLECTOR_TOKEN));
       } else {
         throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index ffd1b7e..008f1ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
@@ -351,7 +352,8 @@ public class TestYarnServerApiClasses {
   private Map<ApplicationId, AppCollectorData> getCollectors() {
     ApplicationId appID = ApplicationId.newInstance(1L, 1);
     String collectorAddr = "localhost:0";
-    AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
+    AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr,
+        Token.newInstance(new byte[0], "kind", new byte[0], "s"));
     Map<ApplicationId, AppCollectorData> collectorMap =
         new HashMap<>();
     collectorMap.put(appID, data);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 7a9f82f..61f1d75 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
-
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
index aafb8d7..32fda01 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
@@ -44,8 +44,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.proto.YarnProtos;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
index 499a5cb..dffd19e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/MockResourceManagerFacade.java
@@ -305,7 +305,7 @@ public class MockResourceManagerFacade implements
         new ArrayList<ContainerStatus>(), containerList,
         new ArrayList<NodeReport>(), null, AMCommand.AM_RESYNC, 1, null,
         new ArrayList<NMToken>(), newAMRMToken,
-        new ArrayList<UpdatedContainer>());
+        new ArrayList<UpdatedContainer>(), null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 5fe90fc..57ef493 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -48,8 +48,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
 import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
-
-
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -81,7 +80,6 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
@@ -640,9 +638,9 @@ public class ApplicationMasterService extends AbstractService implements
 
     // add collector address for this application
     if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
-      AppCollectorData data = app.getCollectorData();
-      if (data != null) {
-        allocateResponse.setCollectorAddr(data.getCollectorAddr());
+      CollectorInfo collectorInfo = app.getCollectorInfo();
+      if (collectorInfo != null) {
+        allocateResponse.setCollectorInfo(collectorInfo);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 1a0b920..93c41b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -187,14 +188,24 @@ public interface RMApp extends EventHandler<RMAppEvent> {
    * only if the timeline service v.2 is enabled.
    *
    * @return the data for the application's collector, including collector
-   * address, collector ID. Return null if the timeline service v.2 is not
-   * enabled.
+   * address, RM ID, version and collector token. Return null if the timeline
+   * service v.2 is not enabled.
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   AppCollectorData getCollectorData();
 
   /**
+   * The timeline collector information to be sent to AM. It should be used
+   * only if the timeline service v.2 is enabled.
+   *
+   * @return collector info, including collector address and collector token.
+   * Return null if the timeline service v.2 is not enabled.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  CollectorInfo getCollectorInfo();
+  /**
    * The original tracking url for the application master.
    * @return the original tracking url for the application master.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/79806939/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index fe0237e..623f564 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
 import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
+import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -167,6 +168,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private int firstAttemptIdInStateStore = 1;
   private int nextAttemptId = 1;
   private AppCollectorData collectorData;
+  private CollectorInfo collectorInfo;
   // This field isn't protected by readlock now.
   private volatile RMAppAttempt currentAttempt;
   private String queue;
@@ -528,7 +530,7 @@ public class RMAppImpl implements RMApp, Recoverable {
    */
   public void startTimelineCollector() {
     AppLevelTimelineCollector collector =
-        new AppLevelTimelineCollector(applicationId);
+        new AppLevelTimelineCollector(applicationId, user);
     rmContext.getRMTimelineCollectorManager().putIfAbsent(
         applicationId, collector);
   }
@@ -616,6 +618,12 @@ public class RMAppImpl implements RMApp, Recoverable {
 
   public void setCollectorData(AppCollectorData incomingData) {
     this.collectorData = incomingData;
+    this.collectorInfo = CollectorInfo.newInstance(
+        incomingData.getCollectorAddr(), incomingData.getCollectorToken());
+  }
+
+  public CollectorInfo getCollectorInfo() {
+    return this.collectorInfo;
   }
 
   public void removeCollectorData() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message