hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From varunsax...@apache.org
Subject [31/50] [abbrv] hadoop git commit: YARN-5355. Backported YARN-2928 into our branch-2 feature branch.
Date Sun, 06 Nov 2016 16:31:48 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
deleted file mode 100644
index 64c7b42..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestRPC {
-
-  private static final String EXCEPTION_MSG = "test error";
-  private static final String EXCEPTION_CAUSE = "exception cause";
-  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  
-  @Test
-  public void testUnknownCall() {
-    Configuration conf = new Configuration();
-    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
-        .getName());
-    YarnRPC rpc = YarnRPC.create(conf);
-    String bindAddr = "localhost:0";
-    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
-    Server server = rpc.getServer(ContainerManagementProtocol.class,
-        new DummyContainerManager(), addr, conf, null, 1);
-    server.start();
-
-    // Any unrelated protocol would do
-    ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy(
-        ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
-
-    try {
-      proxy.getNewApplication(Records
-          .newRecord(GetNewApplicationRequest.class));
-      Assert.fail("Excepted RPC call to fail with unknown method.");
-    } catch (YarnException e) {
-      Assert.assertTrue(e.getMessage().matches(
-          "Unknown method getNewApplication called on.*"
-              + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
-              + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Test
-  public void testHadoopProtoRPC() throws Exception {
-    test(HadoopYarnProtoRPC.class.getName());
-  }
-  
-  private void test(String rpcClass) throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
-    YarnRPC rpc = YarnRPC.create(conf);
-    String bindAddr = "localhost:0";
-    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
-    Server server = rpc.getServer(ContainerManagementProtocol.class, 
-            new DummyContainerManager(), addr, conf, null, 1);
-    server.start();
-    RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class);
-    ContainerManagementProtocol proxy = (ContainerManagementProtocol) 
-        rpc.getProxy(ContainerManagementProtocol.class, 
-            NetUtils.getConnectAddress(server), conf);
-    ContainerLaunchContext containerLaunchContext = 
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    ApplicationId applicationId = ApplicationId.newInstance(0, 0);
-    ApplicationAttemptId applicationAttemptId =
-        ApplicationAttemptId.newInstance(applicationId, 0);
-    ContainerId containerId =
-        ContainerId.newContainerId(applicationAttemptId, 100);
-    NodeId nodeId = NodeId.newInstance("localhost", 1234);
-    Resource resource = Resource.newInstance(1234, 2);
-    ContainerTokenIdentifier containerTokenIdentifier =
-        new ContainerTokenIdentifier(containerId, "localhost", "user",
-          resource, System.currentTimeMillis() + 10000, 42, 42,
-          Priority.newInstance(0), 0);
-    Token containerToken = newContainerToken(nodeId, "password".getBytes(),
-          containerTokenIdentifier);
-
-    StartContainerRequest scRequest =
-        StartContainerRequest.newInstance(containerLaunchContext,
-          containerToken);
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
-    list.add(scRequest);
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    proxy.startContainers(allRequests);
-
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
-    containerIds.add(containerId);
-    GetContainerStatusesRequest gcsRequest =
-        GetContainerStatusesRequest.newInstance(containerIds);
-    GetContainerStatusesResponse response =
-        proxy.getContainerStatuses(gcsRequest);
-    List<ContainerStatus> statuses = response.getContainerStatuses();
-
-    //test remote exception
-    boolean exception = false;
-    try {
-      StopContainersRequest stopRequest =
-          recordFactory.newRecordInstance(StopContainersRequest.class);
-      stopRequest.setContainerIds(containerIds);
-      proxy.stopContainers(stopRequest);
-      } catch (YarnException e) {
-      exception = true;
-      Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG));
-      Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE));
-      System.out.println("Test Exception is " + e.getMessage());
-    } catch (Exception ex) {
-      ex.printStackTrace();
-    }
-    Assert.assertTrue(exception);
-    
-    server.stop();
-    Assert.assertNotNull(statuses.get(0));
-    Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
-  }
-
-  public class DummyContainerManager implements ContainerManagementProtocol {
-
-    private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
-
-    @Override
-    public GetContainerStatusesResponse getContainerStatuses(
-        GetContainerStatusesRequest request)
-    throws YarnException {
-      GetContainerStatusesResponse response = 
-          recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
-      response.setContainerStatuses(statuses);
-      return response;
-    }
-
-    @Override
-    public StartContainersResponse startContainers(
-        StartContainersRequest requests) throws YarnException {
-      StartContainersResponse response =
-          recordFactory.newRecordInstance(StartContainersResponse.class);
-      for (StartContainerRequest request : requests.getStartContainerRequests()) {
-        Token containerToken = request.getContainerToken();
-        ContainerTokenIdentifier tokenId = null;
-
-        try {
-          tokenId = newContainerTokenIdentifier(containerToken);
-        } catch (IOException e) {
-          throw RPCUtil.getRemoteException(e);
-        }
-        ContainerStatus status =
-            recordFactory.newRecordInstance(ContainerStatus.class);
-        status.setState(ContainerState.RUNNING);
-        status.setContainerId(tokenId.getContainerID());
-        status.setExitStatus(0);
-        statuses.add(status);
-
-      }
-      return response;
-    }
-
-    @Override
-    public StopContainersResponse stopContainers(StopContainersRequest request) 
-    throws YarnException {
-      Exception e = new Exception(EXCEPTION_MSG, 
-          new Exception(EXCEPTION_CAUSE));
-      throw new YarnException(e);
-    }
-
-    @Override
-    public IncreaseContainersResourceResponse increaseContainersResource(
-        IncreaseContainersResourceRequest request) throws YarnException, IOException {
-      return null;
-    }
-
-    @Override
-    public SignalContainerResponse signalToContainer(
-        SignalContainerRequest request) throws YarnException {
-      final Exception e = new Exception(EXCEPTION_MSG,
-          new Exception(EXCEPTION_CAUSE));
-      throw new YarnException(e);
-    }
-
-    @Override
-    public ResourceLocalizationResponse localize(
-        ResourceLocalizationRequest request) throws YarnException, IOException {
-      return null;
-    }
-
-    @Override
-    public ReInitializeContainerResponse reInitializeContainer(
-        ReInitializeContainerRequest request) throws YarnException,
-        IOException {
-      return null;
-    }
-
-    @Override
-    public RestartContainerResponse restartContainer(ContainerId containerId)
-        throws YarnException, IOException {
-      return null;
-    }
-
-    @Override
-    public RollbackResponse rollbackLastReInitialization(
-        ContainerId containerId) throws YarnException, IOException {
-      return null;
-    }
-
-    @Override
-    public CommitResponse commitLastReInitialization(ContainerId containerId)
-        throws YarnException, IOException {
-      return null;
-    }
-  }
-
-  public static ContainerTokenIdentifier newContainerTokenIdentifier(
-      Token containerToken) throws IOException {
-    org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
-        new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>(
-            containerToken.getIdentifier()
-                .array(), containerToken.getPassword().array(), new Text(
-                containerToken.getKind()),
-            new Text(containerToken.getService()));
-    return token.decodeIdentifier();
-  }
-
-  public static Token newContainerToken(NodeId nodeId, byte[] password,
-      ContainerTokenIdentifier tokenIdentifier) {
-    // RPC layer client expects ip:port as service for tokens
-    InetSocketAddress addr =
-        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
-    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
-    Token containerToken =
-        Token.newInstance(tokenIdentifier.getBytes(),
-          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
-            .buildTokenService(addr).toString());
-    return containerToken;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
new file mode 100644
index 0000000..221969b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+import org.junit.Test;
+import org.junit.Assert;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+
+public class TestTimelineServiceRecords {
+  private static final Log LOG =
+      LogFactory.getLog(TestTimelineServiceRecords.class);
+
+  @Test
+  public void testTimelineEntities() throws Exception {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setType("test type 1");
+    entity.setId("test id 1");
+    entity.addInfo("test info key 1", "test info value 1");
+    entity.addInfo("test info key 2",
+        Arrays.asList("test info value 2", "test info value 3"));
+    entity.addInfo("test info key 3", true);
+    Assert.assertTrue(
+        entity.getInfo().get("test info key 3") instanceof Boolean);
+    entity.addConfig("test config key 1", "test config value 1");
+    entity.addConfig("test config key 2", "test config value 2");
+
+    TimelineMetric metric1 =
+        new TimelineMetric(TimelineMetric.Type.TIME_SERIES);
+    metric1.setId("test metric id 1");
+    metric1.addValue(1L, 1.0F);
+    metric1.addValue(3L, 3.0D);
+    metric1.addValue(2L, 2);
+    Assert.assertEquals(TimelineMetric.Type.TIME_SERIES, metric1.getType());
+    Iterator<Map.Entry<Long, Number>> itr =
+        metric1.getValues().entrySet().iterator();
+    Map.Entry<Long, Number> entry = itr.next();
+    Assert.assertEquals(new Long(3L), entry.getKey());
+    Assert.assertEquals(3.0D, entry.getValue());
+    entry = itr.next();
+    Assert.assertEquals(new Long(2L), entry.getKey());
+    Assert.assertEquals(2, entry.getValue());
+    entry = itr.next();
+    Assert.assertEquals(new Long(1L), entry.getKey());
+    Assert.assertEquals(1.0F, entry.getValue());
+    Assert.assertFalse(itr.hasNext());
+    entity.addMetric(metric1);
+
+    TimelineMetric metric2 =
+        new TimelineMetric(TimelineMetric.Type.SINGLE_VALUE);
+    metric2.setId("test metric id 1");
+    metric2.addValue(3L, (short) 3);
+    Assert.assertEquals(TimelineMetric.Type.SINGLE_VALUE, metric2.getType());
+    Assert.assertTrue(
+        metric2.getValues().values().iterator().next() instanceof Short);
+    Map<Long, Number> points = new HashMap<>();
+    points.put(4L, 4.0D);
+    points.put(5L, 5.0D);
+    try {
+      metric2.setValues(points);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Values cannot contain more than one point in"));
+    }
+    try {
+      metric2.addValues(points);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          "Values cannot contain more than one point in"));
+    }
+    entity.addMetric(metric2);
+
+    TimelineMetric metric3 =
+        new TimelineMetric(TimelineMetric.Type.SINGLE_VALUE);
+    metric3.setId("test metric id 1");
+    metric3.addValue(4L, (short) 4);
+    Assert.assertEquals("metric3 should equal to metric2! ", metric3, metric2);
+    Assert.assertNotEquals("metric1 should not equal to metric2! ",
+        metric1, metric2);
+
+    TimelineEvent event1 = new TimelineEvent();
+    event1.setId("test event id 1");
+    event1.addInfo("test info key 1", "test info value 1");
+    event1.addInfo("test info key 2",
+        Arrays.asList("test info value 2", "test info value 3"));
+    event1.addInfo("test info key 3", true);
+    Assert.assertTrue(
+        event1.getInfo().get("test info key 3") instanceof Boolean);
+    event1.setTimestamp(1L);
+    entity.addEvent(event1);
+
+    TimelineEvent event2 = new TimelineEvent();
+    event2.setId("test event id 2");
+    event2.addInfo("test info key 1", "test info value 1");
+    event2.addInfo("test info key 2",
+        Arrays.asList("test info value 2", "test info value 3"));
+    event2.addInfo("test info key 3", true);
+    Assert.assertTrue(
+        event2.getInfo().get("test info key 3") instanceof Boolean);
+    event2.setTimestamp(2L);
+    entity.addEvent(event2);
+
+    Assert.assertFalse("event1 should not equal to event2! ",
+        event1.equals(event2));
+    TimelineEvent event3 = new TimelineEvent();
+    event3.setId("test event id 1");
+    event3.setTimestamp(1L);
+    Assert.assertEquals("event1 should equal to event3! ", event3, event1);
+    Assert.assertNotEquals("event1 should not equal to event2! ",
+        event1, event2);
+
+    entity.setCreatedTime(0L);
+    entity.addRelatesToEntity("test type 2", "test id 2");
+    entity.addRelatesToEntity("test type 3", "test id 3");
+    entity.addIsRelatedToEntity("test type 4", "test id 4");
+    entity.addIsRelatedToEntity("test type 5", "test id 5");
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true));
+
+    TimelineEntities entities = new TimelineEntities();
+    TimelineEntity entity1 = new TimelineEntity();
+    entities.addEntity(entity1);
+    TimelineEntity entity2 = new TimelineEntity();
+    entities.addEntity(entity2);
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true));
+
+    Assert.assertFalse("entity 1 should not be valid without type and id",
+        entity1.isValid());
+    entity1.setId("test id 2");
+    entity1.setType("test type 2");
+    entity2.setId("test id 1");
+    entity2.setType("test type 1");
+
+    Assert.assertEquals("Timeline entity should equal to entity2! ",
+        entity, entity2);
+    Assert.assertNotEquals("entity1 should not equal to entity! ",
+        entity1, entity);
+    Assert.assertEquals("entity should be less than entity1! ",
+        entity1.compareTo(entity), 1);
+    Assert.assertEquals("entity's hash code should be -28727840 but not "
+        + entity.hashCode(), entity.hashCode(), -28727840);
+  }
+
+  @Test
+  public void testFirstClassCitizenEntities() throws Exception {
+    UserEntity user = new UserEntity();
+    user.setId("test user id");
+
+    QueueEntity queue = new QueueEntity();
+    queue.setId("test queue id");
+
+
+    ClusterEntity cluster = new ClusterEntity();
+    cluster.setId("test cluster id");
+
+    FlowRunEntity flow1 = new FlowRunEntity();
+    //flow1.setId("test flow id 1");
+    flow1.setUser(user.getId());
+    flow1.setName("test flow name 1");
+    flow1.setVersion("test flow version 1");
+    flow1.setRunId(1L);
+
+    FlowRunEntity flow2 = new FlowRunEntity();
+    //flow2.setId("test flow run id 2");
+    flow2.setUser(user.getId());
+    flow2.setName("test flow name 2");
+    flow2.setVersion("test flow version 2");
+    flow2.setRunId(2L);
+
+    ApplicationEntity app1 = new ApplicationEntity();
+    app1.setId(ApplicationId.newInstance(0, 1).toString());
+    app1.setQueue(queue.getId());
+
+    ApplicationEntity app2 = new ApplicationEntity();
+    app2.setId(ApplicationId.newInstance(0, 2).toString());
+    app2.setQueue(queue.getId());
+
+    ApplicationAttemptEntity appAttempt = new ApplicationAttemptEntity();
+    appAttempt.setId(ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(0, 1), 1).toString());
+
+    ContainerEntity container = new ContainerEntity();
+    container.setId(ContainerId.newContainerId(
+        ApplicationAttemptId.newInstance(
+            ApplicationId.newInstance(0, 1), 1), 1).toString());
+
+    cluster.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(),
+        flow1.getId());
+    flow1
+        .setParent(TimelineEntityType.YARN_CLUSTER.toString(), cluster.getId());
+    flow1.addChild(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
+    flow2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow1.getId());
+    flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(),
+        app1.getId());
+    flow2.addChild(TimelineEntityType.YARN_APPLICATION.toString(),
+        app2.getId());
+    app1.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
+    app1.addChild(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
+        appAttempt.getId());
+    appAttempt
+        .setParent(TimelineEntityType.YARN_APPLICATION.toString(),
+            app1.getId());
+    app2.setParent(TimelineEntityType.YARN_FLOW_RUN.toString(), flow2.getId());
+    appAttempt.addChild(TimelineEntityType.YARN_CONTAINER.toString(),
+        container.getId());
+    container.setParent(TimelineEntityType.YARN_APPLICATION_ATTEMPT.toString(),
+        appAttempt.getId());
+
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(cluster, true));
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(flow1, true));
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(flow2, true));
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(app1, true));
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(app2, true));
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(appAttempt, true));
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(container, true));
+
+
+    // Check parent/children APIs
+    Assert.assertNotNull(app1.getParent());
+    Assert.assertEquals(flow2.getType(), app1.getParent().getType());
+    Assert.assertEquals(flow2.getId(), app1.getParent().getId());
+    app1.addInfo(ApplicationEntity.PARENT_INFO_KEY, "invalid parent object");
+    try {
+      app1.getParent();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof YarnRuntimeException);
+      Assert.assertTrue(e.getMessage().contains(
+          "Parent info is invalid identifier object"));
+    }
+
+    Assert.assertNotNull(app1.getChildren());
+    Assert.assertEquals(1, app1.getChildren().size());
+    Assert.assertEquals(
+        appAttempt.getType(), app1.getChildren().iterator().next().getType());
+    Assert.assertEquals(
+        appAttempt.getId(), app1.getChildren().iterator().next().getId());
+    app1.addInfo(ApplicationEntity.CHILDREN_INFO_KEY,
+        Collections.singletonList("invalid children set"));
+    try {
+      app1.getChildren();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof YarnRuntimeException);
+      Assert.assertTrue(e.getMessage().contains(
+          "Children info is invalid identifier set"));
+    }
+    app1.addInfo(ApplicationEntity.CHILDREN_INFO_KEY,
+        Collections.singleton("invalid child object"));
+    try {
+      app1.getChildren();
+      Assert.fail();
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof YarnRuntimeException);
+      Assert.assertTrue(e.getMessage().contains(
+          "Children info contains invalid identifier object"));
+    }
+  }
+
+  @Test
+  public void testUser() throws Exception {
+    UserEntity user = new UserEntity();
+    user.setId("test user id");
+    user.addInfo("test info key 1", "test info value 1");
+    user.addInfo("test info key 2", "test info value 2");
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(user, true));
+  }
+
+  @Test
+  public void testQueue() throws Exception {
+    QueueEntity queue = new QueueEntity();
+    queue.setId("test queue id");
+    queue.addInfo("test info key 1", "test info value 1");
+    queue.addInfo("test info key 2", "test info value 2");
+    queue.setParent(TimelineEntityType.YARN_QUEUE.toString(),
+        "test parent queue id");
+    queue.addChild(TimelineEntityType.YARN_QUEUE.toString(),
+        "test child queue id 1");
+    queue.addChild(TimelineEntityType.YARN_QUEUE.toString(),
+        "test child queue id 2");
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(queue, true));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
new file mode 100644
index 0000000..5813340
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestTimelineClientV2Impl.java
@@ -0,0 +1,378 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.ws.rs.core.MultivaluedMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+public class TestTimelineClientV2Impl {
+  private static final Log LOG =
+      LogFactory.getLog(TestTimelineClientV2Impl.class);
+  private TestV2TimelineClient client;
+  private static final long TIME_TO_SLEEP = 150L;
+  private static final String EXCEPTION_MSG = "Exception in the content";
+
+  @Before
+  public void setup() {
+    conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
+    conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
+    conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
+    if (!currTestName.getMethodName()
+        .contains("testRetryOnConnectionFailure")) {
+      client = createTimelineClient(conf);
+    }
+  }
+
+  @Rule
+  public TestName currTestName = new TestName();
+  private YarnConfiguration conf;
+
+  private TestV2TimelineClient createTimelineClient(YarnConfiguration config) {
+    ApplicationId id = ApplicationId.newInstance(0, 0);
+    TestV2TimelineClient tc = new TestV2TimelineClient(id);
+    tc.init(config);
+    tc.start();
+    return tc;
+  }
+
+  private class TestV2TimelineClientForExceptionHandling
+      extends TimelineClientImpl {
+    public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
+      super(id);
+    }
+
+    private boolean throwYarnException;
+
+    public void setThrowYarnException(boolean throwYarnException) {
+      this.throwYarnException = throwYarnException;
+    }
+
+    public boolean isThrowYarnException() {
+      return throwYarnException;
+    }
+
+    @Override
+    protected void putObjects(URI base, String path,
+        MultivaluedMap<String, String> params, Object obj)
+            throws IOException, YarnException {
+      if (throwYarnException) {
+        throw new YarnException(EXCEPTION_MSG);
+      } else {
+        throw new IOException(
+            "Failed to get the response from the timeline server.");
+      }
+    }
+  }
+
+  private class TestV2TimelineClient
+      extends TestV2TimelineClientForExceptionHandling {
+    private boolean sleepBeforeReturn;
+
+    private List<TimelineEntities> publishedEntities;
+
+    public TimelineEntities getPublishedEntities(int putIndex) {
+      Assert.assertTrue("Not So many entities Published",
+          putIndex < publishedEntities.size());
+      return publishedEntities.get(putIndex);
+    }
+
+    public void setSleepBeforeReturn(boolean sleepBeforeReturn) {
+      this.sleepBeforeReturn = sleepBeforeReturn;
+    }
+
+    public int getNumOfTimelineEntitiesPublished() {
+      return publishedEntities.size();
+    }
+
+    public TestV2TimelineClient(ApplicationId id) {
+      super(id);
+      publishedEntities = new ArrayList<TimelineEntities>();
+    }
+
+    protected void putObjects(String path,
+        MultivaluedMap<String, String> params, Object obj)
+            throws IOException, YarnException {
+      if (isThrowYarnException()) {
+        throw new YarnException("ActualException");
+      }
+      publishedEntities.add((TimelineEntities) obj);
+      if (sleepBeforeReturn) {
+        try {
+          Thread.sleep(TIME_TO_SLEEP);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testExceptionMultipleRetry() {
+    TestV2TimelineClientForExceptionHandling c =
+        new TestV2TimelineClientForExceptionHandling(
+            ApplicationId.newInstance(0, 0));
+    int maxRetries = 2;
+    conf.setInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+        maxRetries);
+    c.init(conf);
+    c.start();
+    c.setTimelineServiceAddress("localhost:12345");
+    try {
+      c.putEntities(new TimelineEntity());
+    } catch (IOException e) {
+      Assert.fail("YARN exception is expected");
+    } catch (YarnException e) {
+      Throwable cause = e.getCause();
+      Assert.assertTrue("IOException is expected",
+          cause instanceof IOException);
+      Assert.assertTrue("YARN exception is expected",
+          cause.getMessage().contains(
+              "TimelineClient has reached to max retry times : " + maxRetries));
+    }
+
+    c.setThrowYarnException(true);
+    try {
+      c.putEntities(new TimelineEntity());
+    } catch (IOException e) {
+      Assert.fail("YARN exception is expected");
+    } catch (YarnException e) {
+      Throwable cause = e.getCause();
+      Assert.assertTrue("YARN exception is expected",
+          cause instanceof YarnException);
+      Assert.assertTrue("YARN exception is expected",
+          cause.getMessage().contains(EXCEPTION_MSG));
+    }
+    c.stop();
+  }
+
+  @Test
+  public void testPostEntities() throws Exception {
+    try {
+      client.putEntities(generateEntity("1"));
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected");
+    }
+  }
+
+  @Test
+  public void testASyncCallMerge() throws Exception {
+    client.setSleepBeforeReturn(true);
+    try {
+      client.putEntitiesAsync(generateEntity("1"));
+      Thread.sleep(TIME_TO_SLEEP / 2);
+      // by the time first put response comes push 2 entities in the queue
+      client.putEntitiesAsync(generateEntity("2"));
+      client.putEntitiesAsync(generateEntity("3"));
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected");
+    }
+    for (int i = 0; i < 4; i++) {
+      if (client.getNumOfTimelineEntitiesPublished() == 2) {
+        break;
+      }
+      Thread.sleep(TIME_TO_SLEEP);
+    }
+    Assert.assertEquals("two merged TimelineEntities needs to be published", 2,
+        client.getNumOfTimelineEntitiesPublished());
+    TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
+    Assert.assertEquals(
+        "Merged TimelineEntities Object needs to 2 TimelineEntity Object", 2,
+        secondPublishedEntities.getEntities().size());
+    Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
+        secondPublishedEntities.getEntities().get(0).getId());
+    Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
+        secondPublishedEntities.getEntities().get(1).getId());
+  }
+
+  @Test
+  public void testSyncCall() throws Exception {
+    try {
+      // sync entity should not be be merged with Async
+      client.putEntities(generateEntity("1"));
+      client.putEntitiesAsync(generateEntity("2"));
+      client.putEntitiesAsync(generateEntity("3"));
+      // except for the sync call above 2 should be merged
+      client.putEntities(generateEntity("4"));
+    } catch (YarnException e) {
+      Assert.fail("Exception is not expected");
+    }
+    for (int i = 0; i < 4; i++) {
+      if (client.getNumOfTimelineEntitiesPublished() == 3) {
+        break;
+      }
+      Thread.sleep(TIME_TO_SLEEP);
+    }
+    printReceivedEntities();
+    Assert.assertEquals("TimelineEntities not published as desired", 3,
+        client.getNumOfTimelineEntitiesPublished());
+    TimelineEntities firstPublishedEntities = client.getPublishedEntities(0);
+    Assert.assertEquals("sync entities should not be merged with async", 1,
+        firstPublishedEntities.getEntities().size());
+
+    // test before pushing the sync entities asyncs are merged and pushed
+    TimelineEntities secondPublishedEntities = client.getPublishedEntities(1);
+    Assert.assertEquals(
+        "async entities should be merged before publishing sync", 2,
+        secondPublishedEntities.getEntities().size());
+    Assert.assertEquals("Order of Async Events Needs to be FIFO", "2",
+        secondPublishedEntities.getEntities().get(0).getId());
+    Assert.assertEquals("Order of Async Events Needs to be FIFO", "3",
+        secondPublishedEntities.getEntities().get(1).getId());
+
+    // test the last entity published is sync put
+    TimelineEntities thirdPublishedEntities = client.getPublishedEntities(2);
+    Assert.assertEquals("sync entities had to be published at the last", 1,
+        thirdPublishedEntities.getEntities().size());
+    Assert.assertEquals("Expected last sync Event is not proper", "4",
+        thirdPublishedEntities.getEntities().get(0).getId());
+  }
+
+  @Test
+  public void testExceptionCalls() throws Exception {
+    client.setThrowYarnException(true);
+    try {
+      client.putEntitiesAsync(generateEntity("1"));
+    } catch (YarnException e) {
+      Assert.fail("Async calls are not expected to throw exception");
+    }
+
+    try {
+      client.putEntities(generateEntity("2"));
+      Assert.fail("Sync calls are expected to throw exception");
+    } catch (YarnException e) {
+      Assert.assertEquals("Same exception needs to be thrown",
+          "ActualException", e.getCause().getMessage());
+    }
+  }
+
+  @Test
+  public void testConfigurableNumberOfMerges() throws Exception {
+    client.setSleepBeforeReturn(true);
+    try {
+      // At max 3 entities need to be merged
+      client.putEntitiesAsync(generateEntity("1"));
+      client.putEntitiesAsync(generateEntity("2"));
+      client.putEntitiesAsync(generateEntity("3"));
+      client.putEntitiesAsync(generateEntity("4"));
+      client.putEntities(generateEntity("5"));
+      client.putEntitiesAsync(generateEntity("6"));
+      client.putEntitiesAsync(generateEntity("7"));
+      client.putEntitiesAsync(generateEntity("8"));
+      client.putEntitiesAsync(generateEntity("9"));
+      client.putEntitiesAsync(generateEntity("10"));
+    } catch (YarnException e) {
+      Assert.fail("No exception expected");
+    }
+    // not having the same logic here as it doesn't depend on how many times
+    // events are published.
+    Thread.sleep(2 * TIME_TO_SLEEP);
+    printReceivedEntities();
+    for (TimelineEntities publishedEntities : client.publishedEntities) {
+      Assert.assertTrue(
+          "Number of entities should not be greater than 3 for each publish,"
+              + " but was " + publishedEntities.getEntities().size(),
+          publishedEntities.getEntities().size() <= 3);
+    }
+  }
+
+  @Test
+  public void testAfterStop() throws Exception {
+    client.setSleepBeforeReturn(true);
+    try {
+      // At max 3 entities need to be merged
+      client.putEntities(generateEntity("1"));
+      for (int i = 2; i < 20; i++) {
+        client.putEntitiesAsync(generateEntity("" + i));
+      }
+      client.stop();
+      try {
+        client.putEntitiesAsync(generateEntity("50"));
+        Assert.fail("Exception expected");
+      } catch (YarnException e) {
+        // expected
+      }
+    } catch (YarnException e) {
+      Assert.fail("No exception expected");
+    }
+    // not having the same logic here as it doesn't depend on how many times
+    // events are published.
+    for (int i = 0; i < 5; i++) {
+      TimelineEntities publishedEntities =
+          client.publishedEntities.get(client.publishedEntities.size() - 1);
+      TimelineEntity timelineEntity = publishedEntities.getEntities()
+          .get(publishedEntities.getEntities().size() - 1);
+      if (!timelineEntity.getId().equals("19")) {
+        Thread.sleep(2 * TIME_TO_SLEEP);
+      }
+    }
+    printReceivedEntities();
+    TimelineEntities publishedEntities =
+        client.publishedEntities.get(client.publishedEntities.size() - 1);
+    TimelineEntity timelineEntity = publishedEntities.getEntities()
+        .get(publishedEntities.getEntities().size() - 1);
+    Assert.assertEquals("", "19", timelineEntity.getId());
+  }
+
+  private void printReceivedEntities() {
+    for (int i = 0; i < client.getNumOfTimelineEntitiesPublished(); i++) {
+      TimelineEntities publishedEntities = client.getPublishedEntities(i);
+      StringBuilder entitiesPerPublish = new StringBuilder();
+      for (TimelineEntity entity : publishedEntities.getEntities()) {
+        entitiesPerPublish.append(entity.getId());
+        entitiesPerPublish.append(",");
+      }
+      LOG.info("Entities Published @ index " + i + " : "
+          + entitiesPerPublish.toString());
+    }
+  }
+
+  private static TimelineEntity generateEntity(String id) {
+    TimelineEntity entity = new TimelineEntity();
+    entity.setId(id);
+    entity.setType("testEntity");
+    entity.setCreatedTime(System.currentTimeMillis());
+    return entity;
+  }
+
+  @After
+  public void tearDown() {
+    if (client != null) {
+      client.stop();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestTimelineServiceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestTimelineServiceHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestTimelineServiceHelper.java
new file mode 100644
index 0000000..d3d815b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestTimelineServiceHelper.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.util;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestTimelineServiceHelper {
+
+  @Test
+  public void testMapCastToHashMap() {
+
+    // Test null map be casted to null
+    Map<String, String> nullMap = null;
+    Assert.assertNull(TimelineServiceHelper.mapCastToHashMap(nullMap));
+
+    // Test empty hashmap be casted to a empty hashmap
+    Map<String, String> emptyHashMap = new HashMap<String, String>();
+    Assert.assertEquals(
+        TimelineServiceHelper.mapCastToHashMap(emptyHashMap).size(), 0);
+
+    // Test empty non-hashmap be casted to a empty hashmap
+    Map<String, String> emptyTreeMap = new TreeMap<String, String>();
+    Assert.assertEquals(
+        TimelineServiceHelper.mapCastToHashMap(emptyTreeMap).size(), 0);
+
+    // Test non-empty hashmap be casted to hashmap correctly
+    Map<String, String> firstHashMap = new HashMap<String, String>();
+    String key = "KEY";
+    String value = "VALUE";
+    firstHashMap.put(key, value);
+    Assert.assertEquals(
+        TimelineServiceHelper.mapCastToHashMap(firstHashMap), firstHashMap);
+
+    // Test non-empty non-hashmap is casted correctly.
+    Map<String, String> firstTreeMap = new TreeMap<String, String>();
+    firstTreeMap.put(key, value);
+    HashMap<String, String> alternateHashMap =
+        TimelineServiceHelper.mapCastToHashMap(firstTreeMap);
+    Assert.assertEquals(firstTreeMap.size(), alternateHashMap.size());
+    Assert.assertEquals(alternateHashMap.get(key), value);
+
+    // Test complicated hashmap be casted correctly
+    Map<String, Set<String>> complicatedHashMap =
+        new HashMap<String, Set<String>>();
+    Set<String> hashSet = new HashSet<String>();
+    hashSet.add(value);
+    complicatedHashMap.put(key, hashSet);
+    Assert.assertEquals(
+        TimelineServiceHelper.mapCastToHashMap(complicatedHashMap),
+        complicatedHashMap);
+
+    // Test complicated non-hashmap get casted correctly
+    Map<String, Set<String>> complicatedTreeMap =
+        new TreeMap<String, Set<String>>();
+    complicatedTreeMap.put(key, hashSet);
+    Assert.assertEquals(
+        TimelineServiceHelper.mapCastToHashMap(complicatedTreeMap).get(key),
+        hashSet);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index cd131d1..c30db0a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -150,6 +150,7 @@
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>ResourceTracker.proto</include>
                   <include>SCMUploader.proto</include>
+                  <include>collectornodemanager_protocol.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
new file mode 100644
index 0000000..64eea63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocol.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+
+/**
+ * <p>The protocol between an <code>TimelineCollectorManager</code> and a
+ * <code>NodeManager</code> to report a new application collector get launched.
+ * </p>
+ *
+ */
+@Private
+public interface CollectorNodemanagerProtocol {
+
+  /**
+   *
+   * <p>
+   * The <code>TimelineCollectorManager</code> provides a list of mapping
+   * between application and collector's address in
+   * {@link ReportNewCollectorInfoRequest} to a <code>NodeManager</code> to
+   * <em>register</em> collector's info, include: applicationId and REST URI to
+   * access collector. NodeManager will add them into registered collectors
+   * and register them into <code>ResourceManager</code> afterwards.
+   * </p>
+   *
+   * @param request the request of registering a new collector or a list of
+   *                collectors
+   * @return the response for registering the new collector
+   * @throws YarnException if the request is invalid
+   * @throws IOException if there are I/O errors
+   */
+  ReportNewCollectorInfoResponse reportNewCollectorInfo(
+      ReportNewCollectorInfoRequest request)
+      throws YarnException, IOException;
+
+  /**
+   * <p>
+   * The collector needs to get the context information including user, flow
+   * and flow run ID to associate with every incoming put-entity requests.
+   * </p>
+   * @param request the request of getting the aggregator context information of
+   *                the given application
+   * @return the response for registering the new collector
+   * @throws YarnException if the request is invalid
+   * @throws IOException if there are I/O errors
+   */
+  GetTimelineCollectorContextResponse getTimelineCollectorContext(
+      GetTimelineCollectorContextRequest request)
+      throws YarnException, IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
new file mode 100644
index 0000000..24f7c3d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/CollectorNodemanagerProtocolPB.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.CollectorNodemanagerProtocol.CollectorNodemanagerProtocolService;
+
+@Private
+@Unstable
+@ProtocolInfo(
+    protocolName =
+        "org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB",
+    protocolVersion = 1)
+public interface CollectorNodemanagerProtocolPB extends
+    CollectorNodemanagerProtocolService.BlockingInterface {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
new file mode 100644
index 0000000..bc50ac5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/CollectorNodemanagerProtocolPBClientImpl.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class CollectorNodemanagerProtocolPBClientImpl implements
+    CollectorNodemanagerProtocol, Closeable {
+
+  // Not a documented config. Only used for tests internally
+  static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
+      + "rpc.nm-command-timeout";
+
+  /**
+   * Maximum of 1 minute timeout for a Node to react to the command.
+   */
+  static final int DEFAULT_COMMAND_TIMEOUT = 60000;
+
+  private CollectorNodemanagerProtocolPB proxy;
+
+  @Private
+  public CollectorNodemanagerProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, CollectorNodemanagerProtocolPB.class,
+        ProtobufRpcEngine.class);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
+    proxy =
+        (CollectorNodemanagerProtocolPB) RPC.getProxy(
+            CollectorNodemanagerProtocolPB.class,
+            clientVersion, addr, ugi, conf,
+            NetUtils.getDefaultSocketFactory(conf), expireIntvl);
+  }
+
+  @Override
+  public ReportNewCollectorInfoResponse reportNewCollectorInfo(
+      ReportNewCollectorInfoRequest request) throws YarnException, IOException {
+
+    ReportNewCollectorInfoRequestProto requestProto =
+        ((ReportNewCollectorInfoRequestPBImpl) request).getProto();
+    try {
+      return new ReportNewCollectorInfoResponsePBImpl(
+          proxy.reportNewCollectorInfo(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public GetTimelineCollectorContextResponse getTimelineCollectorContext(
+      GetTimelineCollectorContextRequest request)
+      throws YarnException, IOException {
+    GetTimelineCollectorContextRequestProto requestProto =
+        ((GetTimelineCollectorContextRequestPBImpl) request).getProto();
+    try {
+      return new GetTimelineCollectorContextResponsePBImpl(
+          proxy.getTimelineCollectorContext(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+
+  @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
new file mode 100644
index 0000000..7b93a68
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/CollectorNodemanagerProtocolPBServiceImpl.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.GetTimelineCollectorContextResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewCollectorInfoResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class CollectorNodemanagerProtocolPBServiceImpl implements
+    CollectorNodemanagerProtocolPB {
+
+  private CollectorNodemanagerProtocol real;
+
+  public CollectorNodemanagerProtocolPBServiceImpl(
+      CollectorNodemanagerProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public ReportNewCollectorInfoResponseProto reportNewCollectorInfo(
+      RpcController arg0, ReportNewCollectorInfoRequestProto proto)
+      throws ServiceException {
+    ReportNewCollectorInfoRequestPBImpl request =
+        new ReportNewCollectorInfoRequestPBImpl(proto);
+    try {
+      ReportNewCollectorInfoResponse response =
+          real.reportNewCollectorInfo(request);
+      return ((ReportNewCollectorInfoResponsePBImpl)response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetTimelineCollectorContextResponseProto getTimelineCollectorContext(
+      RpcController controller,
+      GetTimelineCollectorContextRequestProto proto) throws ServiceException {
+    GetTimelineCollectorContextRequestPBImpl request =
+        new GetTimelineCollectorContextRequestPBImpl(proto);
+    try {
+      GetTimelineCollectorContextResponse response =
+          real.getTimelineCollectorContext(request);
+      return ((GetTimelineCollectorContextResponsePBImpl)response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java
new file mode 100644
index 0000000..604a40b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextRequest.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class GetTimelineCollectorContextRequest {
+
+  public static GetTimelineCollectorContextRequest newInstance(
+      ApplicationId appId) {
+    GetTimelineCollectorContextRequest request =
+        Records.newRecord(GetTimelineCollectorContextRequest.class);
+    request.setApplicationId(appId);
+    return request;
+  }
+
+  public abstract ApplicationId getApplicationId();
+
+  public abstract void setApplicationId(ApplicationId appId);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
new file mode 100644
index 0000000..bd5c11e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/GetTimelineCollectorContextResponse.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class GetTimelineCollectorContextResponse {
+
+  public static GetTimelineCollectorContextResponse newInstance(
+      String userId, String flowName, String flowVersion, long flowRunId) {
+    GetTimelineCollectorContextResponse response =
+        Records.newRecord(GetTimelineCollectorContextResponse.class);
+    response.setUserId(userId);
+    response.setFlowName(flowName);
+    response.setFlowVersion(flowVersion);
+    response.setFlowRunId(flowRunId);
+    return response;
+  }
+
+  public abstract String getUserId();
+
+  public abstract void setUserId(String userId);
+
+  public abstract String getFlowName();
+
+  public abstract void setFlowName(String flowName);
+
+  public abstract String getFlowVersion();
+
+  public abstract void setFlowVersion(String flowVersion);
+
+  public abstract long getFlowRunId();
+
+  public abstract void setFlowRunId(long flowRunId);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index 84ca8a4..c795e55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -19,8 +19,10 @@
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
@@ -42,6 +44,22 @@ public abstract class NodeHeartbeatRequest {
     return nodeHeartbeatRequest;
   }
 
+  public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
+      MasterKey lastKnownContainerTokenMasterKey,
+      MasterKey lastKnownNMTokenMasterKey, Set<NodeLabel> nodeLabels,
+      Map<ApplicationId, String> registeredCollectors) {
+    NodeHeartbeatRequest nodeHeartbeatRequest =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    nodeHeartbeatRequest.setNodeStatus(nodeStatus);
+    nodeHeartbeatRequest
+        .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
+    nodeHeartbeatRequest
+        .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+    nodeHeartbeatRequest.setNodeLabels(nodeLabels);
+    nodeHeartbeatRequest.setRegisteredCollectors(registeredCollectors);
+    return nodeHeartbeatRequest;
+  }
+
   public abstract NodeStatus getNodeStatus();
   public abstract void setNodeStatus(NodeStatus status);
 
@@ -59,4 +77,9 @@ public abstract class NodeHeartbeatRequest {
 
   public abstract void setLogAggregationReportsForApps(
       List<LogAggregationReport> logAggregationReportsForApps);
+
+  // This tells RM registered collectors' address info on this node
+  public abstract Map<ApplicationId, String> getRegisteredCollectors();
+  public abstract void setRegisteredCollectors(Map<ApplicationId,
+      String> appCollectorsMap);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 386353a..09cafaf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -41,6 +41,10 @@ public interface NodeHeartbeatResponse {
 
   List<ApplicationId> getApplicationsToCleanup();
 
+  // This tells NM the collectors' address info of related apps
+  Map<ApplicationId, String> getAppCollectorsMap();
+  void setAppCollectorsMap(Map<ApplicationId, String> appCollectorsMap);
+
   void setResponseId(int responseId);
   void setNodeAction(NodeAction action);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
new file mode 100644
index 0000000..3498de9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoRequest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import java.util.List;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.records.AppCollectorsMap;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+public abstract class ReportNewCollectorInfoRequest {
+
+  public static ReportNewCollectorInfoRequest newInstance(
+      List<AppCollectorsMap> appCollectorsList) {
+    ReportNewCollectorInfoRequest request =
+        Records.newRecord(ReportNewCollectorInfoRequest.class);
+    request.setAppCollectorsList(appCollectorsList);
+    return request;
+  }
+
+  public static ReportNewCollectorInfoRequest newInstance(
+      ApplicationId id, String collectorAddr) {
+    ReportNewCollectorInfoRequest request =
+        Records.newRecord(ReportNewCollectorInfoRequest.class);
+    request.setAppCollectorsList(
+        Arrays.asList(AppCollectorsMap.newInstance(id, collectorAddr)));
+    return request;
+  }
+
+  public abstract List<AppCollectorsMap> getAppCollectorsList();
+
+  public abstract void setAppCollectorsList(
+      List<AppCollectorsMap> appCollectorsList);
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
new file mode 100644
index 0000000..4157c47
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewCollectorInfoResponse.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class ReportNewCollectorInfoResponse {
+
+  @Private
+  public static ReportNewCollectorInfoResponse newInstance() {
+    ReportNewCollectorInfoResponse response =
+        Records.newRecord(ReportNewCollectorInfoResponse.class);
+    return response;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java
new file mode 100644
index 0000000..7014388
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextRequestPBImpl.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextRequestProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+
+public class GetTimelineCollectorContextRequestPBImpl extends
+    GetTimelineCollectorContextRequest {
+
+  private GetTimelineCollectorContextRequestProto
+      proto = GetTimelineCollectorContextRequestProto.getDefaultInstance();
+  private GetTimelineCollectorContextRequestProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private ApplicationId appId = null;
+
+  public GetTimelineCollectorContextRequestPBImpl() {
+    builder = GetTimelineCollectorContextRequestProto.newBuilder();
+  }
+
+  public GetTimelineCollectorContextRequestPBImpl(
+      GetTimelineCollectorContextRequestProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public GetTimelineCollectorContextRequestProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  private void mergeLocalToBuilder() {
+    if (appId != null) {
+      builder.setAppId(convertToProtoFormat(this.appId));
+    }
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = GetTimelineCollectorContextRequestProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    if (this.appId != null) {
+      return this.appId;
+    }
+
+    GetTimelineCollectorContextRequestProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (!p.hasAppId()) {
+      return null;
+    }
+
+    this.appId = convertFromProtoFormat(p.getAppId());
+    return this.appId;
+  }
+
+  @Override
+  public void setApplicationId(ApplicationId id) {
+    maybeInitBuilder();
+    if (id == null) {
+      builder.clearAppId();
+    }
+    this.appId = id;
+  }
+
+  private ApplicationIdPBImpl convertFromProtoFormat(
+      YarnProtos.ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+
+  private YarnProtos.ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl)t).getProto();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
new file mode 100644
index 0000000..151b036
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/GetTimelineCollectorContextResponsePBImpl.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+
+
+import com.google.protobuf.TextFormat;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.GetTimelineCollectorContextResponseProto;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+
+public class GetTimelineCollectorContextResponsePBImpl extends
+    GetTimelineCollectorContextResponse {
+
+  private GetTimelineCollectorContextResponseProto proto =
+      GetTimelineCollectorContextResponseProto.getDefaultInstance();
+  private GetTimelineCollectorContextResponseProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public GetTimelineCollectorContextResponsePBImpl() {
+    builder = GetTimelineCollectorContextResponseProto.newBuilder();
+  }
+
+  public GetTimelineCollectorContextResponsePBImpl(
+      GetTimelineCollectorContextResponseProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public GetTimelineCollectorContextResponseProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = GetTimelineCollectorContextResponseProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public String getUserId() {
+    GetTimelineCollectorContextResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (!p.hasUserId()) {
+      return null;
+    }
+    return p.getUserId();
+  }
+
+  @Override
+  public void setUserId(String userId) {
+    maybeInitBuilder();
+    if (userId == null) {
+      builder.clearUserId();
+      return;
+    }
+    builder.setUserId(userId);
+  }
+
+  @Override
+  public String getFlowName() {
+    GetTimelineCollectorContextResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (!p.hasFlowName()) {
+      return null;
+    }
+    return p.getFlowName();
+  }
+
+  @Override
+  public void setFlowName(String flowName) {
+    maybeInitBuilder();
+    if (flowName == null) {
+      builder.clearFlowName();
+      return;
+    }
+    builder.setFlowName(flowName);
+  }
+
+  @Override
+  public String getFlowVersion() {
+    GetTimelineCollectorContextResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    if (!p.hasFlowVersion()) {
+      return null;
+    }
+    return p.getFlowVersion();
+  }
+
+  @Override
+  public void setFlowVersion(String flowVersion) {
+    maybeInitBuilder();
+    if (flowVersion == null) {
+      builder.clearFlowVersion();
+      return;
+    }
+    builder.setFlowVersion(flowVersion);
+  }
+
+  @Override
+  public long getFlowRunId() {
+    GetTimelineCollectorContextResponseProtoOrBuilder p =
+        viaProto ? proto : builder;
+    return p.getFlowRunId();
+  }
+
+  @Override
+  public void setFlowRunId(long flowRunId) {
+    maybeInitBuilder();
+    builder.setFlowRunId(flowRunId);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message