hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [50/50] hadoop git commit: YARN-3039. Implemented the app-level timeline aggregator discovery service. Contributed by Junping Du.
Date Wed, 18 Mar 2015 03:34:42 GMT
YARN-3039. Implemented the app-level timeline aggregator discovery service. Contributed by Junping Du.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8a637914
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8a637914
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8a637914

Branch: refs/heads/YARN-2928
Commit: 8a637914c13baae6749b481551901cfac94694f4
Parents: 5de4026
Author: Zhijie Shen <zjshen@apache.org>
Authored: Tue Mar 17 20:23:49 2015 -0700
Committer: Zhijie Shen <zjshen@apache.org>
Committed: Tue Mar 17 20:23:49 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../api/protocolrecords/AllocateResponse.java   |  33 ++
 .../hadoop/yarn/conf/YarnConfiguration.java     |  12 +
 .../src/main/proto/yarn_service_protos.proto    |   1 +
 .../distributedshell/ApplicationMaster.java     |  82 ++++-
 .../hadoop/yarn/client/api/AMRMClient.java      |  18 +
 .../yarn/client/api/async/AMRMClientAsync.java  |  17 +
 .../api/async/impl/AMRMClientAsyncImpl.java     |  15 +-
 .../impl/pb/AllocateResponsePBImpl.java         |  17 +
 .../hadoop/yarn/client/api/TimelineClient.java  |   6 +-
 .../client/api/impl/TimelineClientImpl.java     | 133 ++++++-
 .../hadoop/yarn/webapp/util/WebAppUtils.java    |   2 +-
 .../src/main/resources/yarn-default.xml         |  13 +
 .../hadoop/yarn/TestContainerLaunchRPC.java     |  16 +-
 .../java/org/apache/hadoop/yarn/TestRPC.java    | 247 -------------
 .../hadoop/yarn/api/TestAllocateResponse.java   |  17 +
 .../hadoop-yarn-server-common/pom.xml           |   1 +
 .../api/AggregatorNodemanagerProtocol.java      |  56 +++
 .../api/AggregatorNodemanagerProtocolPB.java    |  33 ++
 ...gregatorNodemanagerProtocolPBClientImpl.java |  94 +++++
 ...regatorNodemanagerProtocolPBServiceImpl.java |  61 ++++
 .../protocolrecords/NodeHeartbeatRequest.java   |  23 ++
 .../protocolrecords/NodeHeartbeatResponse.java  |   4 +
 .../ReportNewAggregatorsInfoRequest.java        |  53 +++
 .../ReportNewAggregatorsInfoResponse.java       |  32 ++
 .../impl/pb/NodeHeartbeatRequestPBImpl.java     |  61 ++++
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  47 +++
 .../ReportNewAggregatorsInfoRequestPBImpl.java  | 142 ++++++++
 .../ReportNewAggregatorsInfoResponsePBImpl.java |  74 ++++
 .../server/api/records/AppAggregatorsMap.java   |  33 ++
 .../impl/pb/AppAggregatorsMapPBImpl.java        | 151 ++++++++
 .../proto/aggregatornodemanager_protocol.proto  |  29 ++
 .../yarn_server_common_service_protos.proto     |  21 ++
 .../java/org/apache/hadoop/yarn/TestRPC.java    | 345 +++++++++++++++++++
 .../hadoop/yarn/TestYarnServerApiClasses.java   |  17 +
 .../hadoop/yarn/server/nodemanager/Context.java |  13 +
 .../yarn/server/nodemanager/NodeManager.java    |  46 ++-
 .../nodemanager/NodeStatusUpdaterImpl.java      |   7 +-
 .../aggregatormanager/NMAggregatorService.java  | 113 ++++++
 .../application/ApplicationImpl.java            |   4 +
 .../ApplicationMasterService.java               |   6 +
 .../resourcemanager/ResourceTrackerService.java |  68 +++-
 .../server/resourcemanager/rmapp/RMApp.java     |  17 +
 .../rmapp/RMAppAggregatorUpdateEvent.java       |  36 ++
 .../resourcemanager/rmapp/RMAppEventType.java   |   3 +
 .../server/resourcemanager/rmapp/RMAppImpl.java |  51 ++-
 .../applicationsmanager/MockAsm.java            |  12 +
 .../server/resourcemanager/rmapp/MockRMApp.java |  15 +
 .../PerNodeTimelineAggregatorsAuxService.java   |   5 +-
 .../TimelineAggregatorsCollection.java          |  78 ++++-
 .../TestTimelineAggregatorsCollection.java      |  11 +-
 51 files changed, 2103 insertions(+), 291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index e62bcf9..47351c6 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -29,6 +29,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3264. Created backing storage write interface and a POC only FS based
     storage implementation. (Vrushali C via zjshen)
 
+    YARN-3039. Implemented the app-level timeline aggregator discovery service.
+    (Junping Du via zjshen)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
index c4fdb79..421c2a0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
@@ -120,6 +120,25 @@ public abstract class AllocateResponse {
     response.setAMRMToken(amRMToken);
     return response;
   }
+  
+  @Public
+  @Unstable
+  public static AllocateResponse newInstance(int responseId,
+      List<ContainerStatus> completedContainers,
+      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+      Resource availResources, AMCommand command, int numClusterNodes,
+      PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
+      List<ContainerResourceIncrease> increasedContainers,
+      List<ContainerResourceDecrease> decreasedContainers,
+      String aggregatorAddr) {
+    AllocateResponse response =
+        newInstance(responseId, completedContainers, allocatedContainers,
+          updatedNodes, availResources, command, numClusterNodes, preempt,
+          nmTokens, increasedContainers, decreasedContainers);
+    response.setAMRMToken(amRMToken);
+    response.setAggregatorAddr(aggregatorAddr);
+    return response;
+  }
 
   /**
    * If the <code>ResourceManager</code> needs the
@@ -304,4 +323,18 @@ public abstract class AllocateResponse {
   @Private
   @Unstable
   public abstract void setAMRMToken(Token amRMToken);
+  
+  /**
+   * The address of aggregator that belong to this app
+   *
+   * @return The address of aggregator that belong to this attempt
+   */
+  @Public
+  @Unstable
+  public abstract String getAggregatorAddr();
+  
+  @Private
+  @Unstable
+  public abstract void setAggregatorAddr(String aggregatorAddr);
+  
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 4512086..a987044 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -643,6 +643,11 @@ public class YarnConfiguration extends Configuration {
     NM_PREFIX + "container-manager.thread-count";
   public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;
   
+  /** Number of threads container manager uses.*/
+  public static final String NM_AGGREGATOR_SERVICE_THREAD_COUNT =
+    NM_PREFIX + "aggregator-service.thread-count";
+  public static final int DEFAULT_NM_AGGREGATOR_SERVICE_THREAD_COUNT = 5;
+  
   /** Number of threads used in cleanup.*/
   public static final String NM_DELETE_THREAD_COUNT = 
     NM_PREFIX +  "delete.thread-count";
@@ -670,6 +675,13 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" +
     DEFAULT_NM_LOCALIZER_PORT;
   
+  /** Address where the aggregator service IPC is.*/
+  public static final String NM_AGGREGATOR_SERVICE_ADDRESS =
+    NM_PREFIX + "aggregator-service.address";
+  public static final int DEFAULT_NM_AGGREGATOR_SERVICE_PORT = 8048;
+  public static final String DEFAULT_NM_AGGREGATOR_SERVICE_ADDRESS = 
+      "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT;
+  
   /** Interval in between cache cleanups.*/
   public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
     NM_PREFIX + "localizer.cache.cleanup.interval-ms";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index 33d1207..4ae4806 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -87,6 +87,7 @@ message AllocateResponseProto {
   repeated ContainerResourceIncreaseProto increased_containers = 10;
   repeated ContainerResourceDecreaseProto decreased_containers = 11;
   optional hadoop.common.TokenProto am_rm_token = 12;
+  optional string aggregator_addr = 13;
 }
 
 enum SchedulerResourceTypes {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index db49166..3a19ac2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -38,6 +38,9 @@ import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
@@ -100,6 +103,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.log4j.LogManager;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * An ApplicationMaster for executing shell commands on a set of launched
@@ -209,6 +213,13 @@ public class ApplicationMaster {
   private String appMasterTrackingUrl = "";
   
   private boolean newTimelineService = false;
+  
+  // For posting entities in new timeline service in a non-blocking way
+  // TODO replace with event loop in TimelineClient.
+  private static ExecutorService threadPool = 
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+          .build());
 
   // App Master configuration
   // No. of containers to run shell command on
@@ -291,6 +302,19 @@ public class ApplicationMaster {
       }
       appMaster.run();
       result = appMaster.finish();
+      
+      threadPool.shutdown();
+      
+      while (!threadPool.isTerminated()) { // wait for all posting thread to finish
+        try {
+          if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
+            threadPool.shutdownNow(); // send interrupt to hurry them along
+          }
+        } catch (InterruptedException e) {
+          LOG.warn("Timeline client service stop interrupted!");
+          break;
+        }
+      }
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       LogManager.shutdown();
@@ -515,8 +539,12 @@ public class ApplicationMaster {
         }
       }
       // Creating the Timeline Client
-      timelineClient = TimelineClient.createTimelineClient(
-          appAttemptID.getApplicationId());
+      if (newTimelineService) {
+        timelineClient = TimelineClient.createTimelineClient(
+            appAttemptID.getApplicationId());
+      } else {
+        timelineClient = TimelineClient.createTimelineClient();
+      }
       timelineClient.init(conf);
       timelineClient.start();
     } else {
@@ -589,8 +617,10 @@ public class ApplicationMaster {
     AMRMClientAsync.CallbackHandler allocListener = new RMCallbackHandler();
     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
     amRMClient.init(conf);
+    // need to bind timelineClient before start.
+    amRMClient.registerTimelineClient(timelineClient);
     amRMClient.start();
-
+   
     containerListener = createNMCallbackHandler();
     nmClientAsync = new NMClientAsyncImpl(containerListener);
     nmClientAsync.init(conf);
@@ -727,7 +757,7 @@ public class ApplicationMaster {
     if(timelineClient != null) {
       timelineClient.stop();
     }
-
+    
     return success;
   }
   
@@ -1192,6 +1222,18 @@ public class ApplicationMaster {
   }
   
   private static void publishContainerStartEventOnNewTimelineService(
+      final TimelineClient timelineClient, final Container container, 
+      final String domainId, final UserGroupInformation ugi) {
+    Runnable publishWrapper = new Runnable() {
+      public void run() {
+        publishContainerStartEventOnNewTimelineServiceBase(timelineClient, 
+            container, domainId, ugi);
+      }
+    };
+    threadPool.execute(publishWrapper);
+  }
+
+  private static void publishContainerStartEventOnNewTimelineServiceBase(
       final TimelineClient timelineClient, Container container, String domainId,
       UserGroupInformation ugi) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
@@ -1223,10 +1265,22 @@ public class ApplicationMaster {
           e instanceof UndeclaredThrowableException ? e.getCause() : e);
     }
   }
-
+  
   private static void publishContainerEndEventOnNewTimelineService(
-      final TimelineClient timelineClient, ContainerStatus container,
-      String domainId, UserGroupInformation ugi) {
+      final TimelineClient timelineClient, final ContainerStatus container,
+      final String domainId, final UserGroupInformation ugi) {
+    Runnable publishWrapper = new Runnable() {
+      public void run() {
+          publishContainerEndEventOnNewTimelineServiceBase(timelineClient, 
+              container, domainId, ugi);
+      }
+    };
+    threadPool.execute(publishWrapper);
+  }
+  
+  private static void publishContainerEndEventOnNewTimelineServiceBase(
+      final TimelineClient timelineClient, final ContainerStatus container,
+      final String domainId, final UserGroupInformation ugi) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 
         new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity();
     entity.setId(container.getContainerId().toString());
@@ -1257,6 +1311,20 @@ public class ApplicationMaster {
   }
 
   private static void publishApplicationAttemptEventOnNewTimelineService(
+      final TimelineClient timelineClient, final String appAttemptId,
+      final DSEvent appEvent, final String domainId, 
+      final UserGroupInformation ugi) {
+  
+    Runnable publishWrapper = new Runnable() {
+      public void run() {
+        publishApplicationAttemptEventOnNewTimelineServiceBase(timelineClient, 
+            appAttemptId, appEvent, domainId, ugi);
+      }
+    };
+    threadPool.execute(publishWrapper);
+  }
+  
+  private static void publishApplicationAttemptEventOnNewTimelineServiceBase(
       final TimelineClient timelineClient, String appAttemptId,
       DSEvent appEvent, String domainId, UserGroupInformation ugi) {
     final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
index bfe10d6..56f2b10 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/AMRMClient.java
@@ -47,6 +47,8 @@ import com.google.common.collect.ImmutableList;
 public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
     AbstractService {
   private static final Log LOG = LogFactory.getLog(AMRMClient.class);
+  
+  private TimelineClient timelineClient;
 
   /**
    * Create a new instance of AMRMClient.
@@ -374,6 +376,22 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
   }
 
   /**
+   * Register TimelineClient to AMRMClient.
+   * @param timelineClient
+   */
+  public void registerTimelineClient(TimelineClient timelineClient) {
+    this.timelineClient = timelineClient;
+  }
+  
+  /**
+   * Get registered timeline client.
+   * @return
+   */
+  public TimelineClient getRegisteredTimeineClient() {
+    return this.timelineClient;
+  }
+  
+  /**
    * Wait for <code>check</code> to return true for each 1000 ms.
    * See also {@link #waitFor(com.google.common.base.Supplier, int)}
    * and {@link #waitFor(com.google.common.base.Supplier, int, int)}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
index f62e71b..be5610e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/AMRMClientAsync.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -193,6 +194,22 @@ extends AbstractService {
    * @return Current number of nodes in the cluster
    */
   public abstract int getClusterNodeCount();
+  
+  /**
+   * Register TimelineClient to AMRMClient.
+   * @param timelineClient
+   */
+  public void registerTimelineClient(TimelineClient timelineClient) {
+    client.registerTimelineClient(timelineClient);
+  }
+  
+  /**
+   * Get registered timeline client.
+   * @return
+   */
+  public TimelineClient getRegisteredTimeineClient() {
+    return client.getRegisteredTimeineClient();
+  }
 
   /**
    * Update application's blacklist with addition or removal resources.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
index addc3b6..f0f0bc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/AMRMClientAsyncImpl.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -65,6 +66,8 @@ extends AMRMClientAsync<T> {
   private volatile boolean keepRunning;
   private volatile float progress;
   
+  private volatile String aggregatorAddr;
+  
   private volatile Throwable savedException;
   
   public AMRMClientAsyncImpl(int intervalMs, CallbackHandler callbackHandler) {
@@ -304,7 +307,17 @@ extends AMRMClientAsync<T> {
           if (!allocated.isEmpty()) {
             handler.onContainersAllocated(allocated);
           }
-
+          
+          String aggregatorAddress = response.getAggregatorAddr();
+          TimelineClient timelineClient = client.getRegisteredTimeineClient();
+          if (timelineClient != null && aggregatorAddress != null 
+              && !aggregatorAddress.isEmpty()) {
+            if (aggregatorAddr == null || 
+                !aggregatorAddr.equals(aggregatorAddress)) {
+              aggregatorAddr = aggregatorAddress;
+              timelineClient.setTimelineServiceAddress(aggregatorAddress);
+            }
+          }
           progress = handler.getProgress();
         } catch (Throwable ex) {
           handler.onError(ex);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
index f2796fd..605c29a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
@@ -384,6 +384,23 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     }
     this.amrmToken = amRMToken;
   }
+  
+
+  @Override
+  public String getAggregatorAddr() {
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getAggregatorAddr();
+  }
+  
+  @Override
+  public void setAggregatorAddr(String aggregatorAddr) {
+    maybeInitBuilder();
+    if (aggregatorAddr == null) {
+      builder.clearAggregatorAddr();
+      return;
+    }
+    builder.setAggregatorAddr(aggregatorAddr);
+  }
 
   private synchronized void initLocalIncreasedContainerList() {
     if (this.increasedContainers != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
index d40ad7c..5db347e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
@@ -42,7 +42,6 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
 public abstract class TimelineClient extends AbstractService {
 
   protected ApplicationId contextAppId;
-  protected String timelineServiceAddress;
 
   @Public
   public static TimelineClient createTimelineClient() {
@@ -185,7 +184,6 @@ public abstract class TimelineClient extends AbstractService {
    * @param address
    *          the timeline service address
    */
-  public void setTimelineServiceAddress(String address) {
-    timelineServiceAddress = address;
-  }
+  public abstract void setTimelineServiceAddress(String address);
+  
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
index 22dcc00..407682d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
@@ -115,6 +115,15 @@ public class TimelineClientImpl extends TimelineClient {
   private DelegationTokenAuthenticatedURL.Token token;
   private UserGroupInformation authUgi;
   private String doAsUser;
+  
+  private volatile String timelineServiceAddress;
+  
+  // Retry parameters for identifying new timeline service
+  // TODO consider to merge with connection retry
+  private int maxServiceRetries;
+  private long serviceRetryInterval;
+  
+  private boolean newTimelineService = false;
 
   @Private
   @VisibleForTesting
@@ -260,6 +269,7 @@ public class TimelineClientImpl extends TimelineClient {
 
   public TimelineClientImpl(ApplicationId applicationId) {
     super(TimelineClientImpl.class.getName(), applicationId);
+    this.newTimelineService = true;
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
@@ -287,18 +297,32 @@ public class TimelineClientImpl extends TimelineClient {
     client = new Client(new URLConnectionClientHandler(
         new TimelineURLConnectionFactory()), cc);
     TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
-    client.addFilter(retryFilter);
+    // TODO need to cleanup filter retry later.
+    if (!newTimelineService) {
+      client.addFilter(retryFilter);
+    }
 
-    if (YarnConfiguration.useHttps(conf)) {
-      timelineServiceAddress = conf.get(
-          YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
+    // old version timeline service need to get address from configuration
+    // while new version need to auto discovery (with retry).
+    if (newTimelineService) {
+      maxServiceRetries = conf.getInt(
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
+      serviceRetryInterval = conf.getLong(
+          YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
     } else {
-      timelineServiceAddress = conf.get(
-          YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
-    }
-    LOG.info("Timeline service address: " + timelineServiceAddress);
+      if (YarnConfiguration.useHttps(conf)) {
+        setTimelineServiceAddress(conf.get(
+            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
+      } else {
+        setTimelineServiceAddress(conf.get(
+            YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
+      }
+      LOG.info("Timeline service address: " + getTimelineServiceAddress());
+    } 
     super.serviceInit(conf);
   }
 
@@ -341,8 +365,7 @@ public class TimelineClientImpl extends TimelineClient {
     if (async) {
       params.add("async", Boolean.TRUE.toString());
     }
-    putObjects(constructResURI(getConfig(), timelineServiceAddress, true),
-        "entities", params, entitiesContainer);
+    putObjects("entities", params, entitiesContainer);
   }
 
   @Override
@@ -350,6 +373,60 @@ public class TimelineClientImpl extends TimelineClient {
       YarnException {
     doPosting(domain, "domain");
   }
+  
+  // Used for new timeline service only
+  @Private
+  public void putObjects(String path, MultivaluedMap<String, String> params, 
+      Object obj) throws IOException, YarnException {
+    
+    // timelineServiceAddress could haven't be initialized yet 
+    // or stale (only for new timeline service)
+    int retries = pollTimelineServiceAddress(this.maxServiceRetries);
+    
+    // timelineServiceAddress could be stale, add retry logic here.
+    boolean needRetry = true;
+    while (needRetry) {
+      try {
+        URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
+        putObjects(uri, path, params, obj);
+        needRetry = false;
+      }
+      catch (Exception e) {
+        // TODO only handle exception for timelineServiceAddress being updated.
+        // skip retry for other exceptions.
+        checkRetryWithSleep(retries, e);
+        retries--;
+      }
+    }
+  }
+  
+  /**
+   * Check if reaching to maximum of retries.
+   * @param retries
+   * @param e
+   */
+  private void checkRetryWithSleep(int retries, Exception e) throws 
+      YarnException, IOException {
+    if (retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException ex) {
+        Thread.currentThread().interrupt();
+      }
+    } else {
+      LOG.error(
+        "TimelineClient has reached to max retry times :" + 
+        this.maxServiceRetries + " for service address: " + 
+        timelineServiceAddress);
+      if (e instanceof YarnException) {
+        throw (YarnException)e;
+      } else if (e instanceof IOException) {
+        throw (IOException)e;
+      } else {
+        throw new YarnException(e);
+      }
+    }
+  }
 
   private void putObjects(
       URI base, String path, MultivaluedMap<String, String> params, Object obj)
@@ -410,6 +487,15 @@ public class TimelineClientImpl extends TimelineClient {
     }
     return resp;
   }
+  
+  @Override
+  public void setTimelineServiceAddress(String address) {
+    this.timelineServiceAddress = address;
+  }
+  
+  private String getTimelineServiceAddress() {
+    return this.timelineServiceAddress;
+  }
 
   @SuppressWarnings("unchecked")
   @Override
@@ -424,8 +510,10 @@ public class TimelineClientImpl extends TimelineClient {
             DelegationTokenAuthenticatedURL authUrl =
                 new DelegationTokenAuthenticatedURL(authenticator,
                     connConfigurator);
+            // TODO we should add retry logic here if timelineServiceAddress is
+            // not available immediately.
             return (Token) authUrl.getDelegationToken(
-                constructResURI(getConfig(), timelineServiceAddress, false).toURL(),
+                constructResURI(getConfig(), getTimelineServiceAddress(), false).toURL(),
                 token, renewer, doAsUser);
           }
         };
@@ -523,6 +611,7 @@ public class TimelineClientImpl extends TimelineClient {
     return connectionRetry.retryOn(tokenRetryOp);
   }
 
+  // Old timeline service, no external retry logic.
   @Private
   @VisibleForTesting
   public ClientResponse doPostingObject(Object object, String path) {
@@ -540,6 +629,24 @@ public class TimelineClientImpl extends TimelineClient {
       throw new YarnRuntimeException("Unknown resource type");
     }
   }
+  
+  /**
+   * Poll TimelineServiceAddress for maximum of retries times if it is null
+   * @param retries
+   * @return the left retry times
+   */
+  private int pollTimelineServiceAddress(int retries) {
+    while (timelineServiceAddress == null && retries > 0) {
+      try {
+        Thread.sleep(this.serviceRetryInterval);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      timelineServiceAddress = getTimelineServiceAddress();
+      retries--;
+    }
+    return retries;
+  }
 
   private class TimelineURLConnectionFactory
       implements HttpURLConnectionFactory {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
index 3aeb33e..e9a0a88 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/util/WebAppUtils.java
@@ -172,7 +172,7 @@ public class WebAppUtils {
     return getResolvedAddress(address);
   }
 
-  private static String getResolvedAddress(InetSocketAddress address) {
+  public static String getResolvedAddress(InetSocketAddress address) {
     address = NetUtils.getConnectAddress(address);
     StringBuilder sb = new StringBuilder();
     InetAddress resolved = address.getAddress();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index 66400c8..226d8ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -714,6 +714,12 @@
     <name>yarn.nodemanager.container-manager.thread-count</name>
     <value>20</value>
   </property>
+  
+    <property>
+    <description>Number of threads aggregator service uses.</description>
+    <name>yarn.nodemanager.aggregator-service.thread-count</name>
+    <value>5</value>
+  </property>
 
   <property>
     <description>Number of threads used in cleanup.</description>
@@ -782,6 +788,13 @@
     <name>yarn.nodemanager.localizer.address</name>
     <value>${yarn.nodemanager.hostname}:8040</value>
   </property>
+  
+  
+  <property>
+    <description>Address where the aggregator service IPC is.</description>
+    <name>yarn.nodemanager.aggregator-service.address</name>
+    <value>${yarn.nodemanager.hostname}:8048</value>
+  </property>
 
   <property>
     <description>Interval in between cache cleanups.</description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
index e2071dd..26d6d04 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestContainerLaunchRPC.java
@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -105,7 +106,7 @@ public class TestContainerLaunchRPC {
             resource, System.currentTimeMillis() + 10000, 42, 42,
             Priority.newInstance(0), 0);
       Token containerToken =
-          TestRPC.newContainerToken(nodeId, "password".getBytes(),
+          newContainerToken(nodeId, "password".getBytes(),
             containerTokenIdentifier);
 
       StartContainerRequest scRequest =
@@ -130,6 +131,19 @@ public class TestContainerLaunchRPC {
 
     Assert.fail("timeout exception should have occurred!");
   }
+  
+  public static Token newContainerToken(NodeId nodeId, byte[] password,
+      ContainerTokenIdentifier tokenIdentifier) {
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
+    Token containerToken =
+        Token.newInstance(tokenIdentifier.getBytes(),
+          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return containerToken;
+  }
 
   public class DummyContainerManager implements ContainerManagementProtocol {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
deleted file mode 100644
index 39e6162..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/TestRPC.java
+++ /dev/null
@@ -1,247 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.hadoop.yarn;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.util.Records;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class TestRPC {
-
-  private static final String EXCEPTION_MSG = "test error";
-  private static final String EXCEPTION_CAUSE = "exception cause";
-  private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
-  
-  @Test
-  public void testUnknownCall() {
-    Configuration conf = new Configuration();
-    conf.set(YarnConfiguration.IPC_RPC_IMPL, HadoopYarnProtoRPC.class
-        .getName());
-    YarnRPC rpc = YarnRPC.create(conf);
-    String bindAddr = "localhost:0";
-    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
-    Server server = rpc.getServer(ContainerManagementProtocol.class,
-        new DummyContainerManager(), addr, conf, null, 1);
-    server.start();
-
-    // Any unrelated protocol would do
-    ApplicationClientProtocol proxy = (ApplicationClientProtocol) rpc.getProxy(
-        ApplicationClientProtocol.class, NetUtils.getConnectAddress(server), conf);
-
-    try {
-      proxy.getNewApplication(Records
-          .newRecord(GetNewApplicationRequest.class));
-      Assert.fail("Excepted RPC call to fail with unknown method.");
-    } catch (YarnException e) {
-      Assert.assertTrue(e.getMessage().matches(
-          "Unknown method getNewApplication called on.*"
-              + "org.apache.hadoop.yarn.proto.ApplicationClientProtocol"
-              + "\\$ApplicationClientProtocolService\\$BlockingInterface protocol."));
-    } catch (Exception e) {
-      e.printStackTrace();
-    }
-  }
-
-  @Test
-  public void testHadoopProtoRPC() throws Exception {
-    test(HadoopYarnProtoRPC.class.getName());
-  }
-  
-  private void test(String rpcClass) throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(YarnConfiguration.IPC_RPC_IMPL, rpcClass);
-    YarnRPC rpc = YarnRPC.create(conf);
-    String bindAddr = "localhost:0";
-    InetSocketAddress addr = NetUtils.createSocketAddr(bindAddr);
-    Server server = rpc.getServer(ContainerManagementProtocol.class, 
-            new DummyContainerManager(), addr, conf, null, 1);
-    server.start();
-    RPC.setProtocolEngine(conf, ContainerManagementProtocolPB.class, ProtobufRpcEngine.class);
-    ContainerManagementProtocol proxy = (ContainerManagementProtocol) 
-        rpc.getProxy(ContainerManagementProtocol.class, 
-            NetUtils.getConnectAddress(server), conf);
-    ContainerLaunchContext containerLaunchContext = 
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-
-    ApplicationId applicationId = ApplicationId.newInstance(0, 0);
-    ApplicationAttemptId applicationAttemptId =
-        ApplicationAttemptId.newInstance(applicationId, 0);
-    ContainerId containerId =
-        ContainerId.newContainerId(applicationAttemptId, 100);
-    NodeId nodeId = NodeId.newInstance("localhost", 1234);
-    Resource resource = Resource.newInstance(1234, 2);
-    ContainerTokenIdentifier containerTokenIdentifier =
-        new ContainerTokenIdentifier(containerId, "localhost", "user",
-          resource, System.currentTimeMillis() + 10000, 42, 42,
-          Priority.newInstance(0), 0);
-    Token containerToken = newContainerToken(nodeId, "password".getBytes(),
-          containerTokenIdentifier);
-
-    StartContainerRequest scRequest =
-        StartContainerRequest.newInstance(containerLaunchContext,
-          containerToken);
-    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
-    list.add(scRequest);
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    proxy.startContainers(allRequests);
-
-    List<ContainerId> containerIds = new ArrayList<ContainerId>();
-    containerIds.add(containerId);
-    GetContainerStatusesRequest gcsRequest =
-        GetContainerStatusesRequest.newInstance(containerIds);
-    GetContainerStatusesResponse response =
-        proxy.getContainerStatuses(gcsRequest);
-    List<ContainerStatus> statuses = response.getContainerStatuses();
-
-    //test remote exception
-    boolean exception = false;
-    try {
-      StopContainersRequest stopRequest =
-          recordFactory.newRecordInstance(StopContainersRequest.class);
-      stopRequest.setContainerIds(containerIds);
-      proxy.stopContainers(stopRequest);
-      } catch (YarnException e) {
-      exception = true;
-      Assert.assertTrue(e.getMessage().contains(EXCEPTION_MSG));
-      Assert.assertTrue(e.getMessage().contains(EXCEPTION_CAUSE));
-      System.out.println("Test Exception is " + e.getMessage());
-    } catch (Exception ex) {
-      ex.printStackTrace();
-    }
-    Assert.assertTrue(exception);
-    
-    server.stop();
-    Assert.assertNotNull(statuses.get(0));
-    Assert.assertEquals(ContainerState.RUNNING, statuses.get(0).getState());
-  }
-
-  public class DummyContainerManager implements ContainerManagementProtocol {
-
-    private List<ContainerStatus> statuses = new ArrayList<ContainerStatus>();
-
-    @Override
-    public GetContainerStatusesResponse getContainerStatuses(
-        GetContainerStatusesRequest request)
-    throws YarnException {
-      GetContainerStatusesResponse response = 
-          recordFactory.newRecordInstance(GetContainerStatusesResponse.class);
-      response.setContainerStatuses(statuses);
-      return response;
-    }
-
-    @Override
-    public StartContainersResponse startContainers(
-        StartContainersRequest requests) throws YarnException {
-      StartContainersResponse response =
-          recordFactory.newRecordInstance(StartContainersResponse.class);
-      for (StartContainerRequest request : requests.getStartContainerRequests()) {
-        Token containerToken = request.getContainerToken();
-        ContainerTokenIdentifier tokenId = null;
-
-        try {
-          tokenId = newContainerTokenIdentifier(containerToken);
-        } catch (IOException e) {
-          throw RPCUtil.getRemoteException(e);
-        }
-        ContainerStatus status =
-            recordFactory.newRecordInstance(ContainerStatus.class);
-        status.setState(ContainerState.RUNNING);
-        status.setContainerId(tokenId.getContainerID());
-        status.setExitStatus(0);
-        statuses.add(status);
-
-      }
-      return response;
-    }
-
-    @Override
-    public StopContainersResponse stopContainers(StopContainersRequest request) 
-    throws YarnException {
-      Exception e = new Exception(EXCEPTION_MSG, 
-          new Exception(EXCEPTION_CAUSE));
-      throw new YarnException(e);
-    }
-  }
-
-  public static ContainerTokenIdentifier newContainerTokenIdentifier(
-      Token containerToken) throws IOException {
-    org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
-        new org.apache.hadoop.security.token.Token<ContainerTokenIdentifier>(
-            containerToken.getIdentifier()
-                .array(), containerToken.getPassword().array(), new Text(
-                containerToken.getKind()),
-            new Text(containerToken.getService()));
-    return token.decodeIdentifier();
-  }
-
-  public static Token newContainerToken(NodeId nodeId, byte[] password,
-      ContainerTokenIdentifier tokenIdentifier) {
-    // RPC layer client expects ip:port as service for tokens
-    InetSocketAddress addr =
-        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
-    // NOTE: use SecurityUtil.setTokenService if this becomes a "real" token
-    Token containerToken =
-        Token.newInstance(tokenIdentifier.getBytes(),
-          ContainerTokenIdentifier.KIND.toString(), password, SecurityUtil
-            .buildTokenService(addr).toString());
-    return containerToken;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
index fbe9af9..ef0bdcc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestAllocateResponse.java
@@ -111,4 +111,21 @@ public class TestAllocateResponse {
     Assert.assertEquals(0, r.getIncreasedContainers().size());
     Assert.assertEquals(0, r.getDecreasedContainers().size());
   }
+  
+  @SuppressWarnings("deprecation")
+  @Test
+  public void testAllocateResponseWithAggregatorAddress() {
+    final String aggregatorAddr = "localhost:0";
+    AllocateResponse r =
+        AllocateResponse.newInstance(3, new ArrayList<ContainerStatus>(),
+            new ArrayList<Container>(), new ArrayList<NodeReport>(), null,
+            AMCommand.AM_RESYNC, 3, null, new ArrayList<NMToken>(), null, 
+            null, null, aggregatorAddr);
+
+    AllocateResponseProto p = ((AllocateResponsePBImpl) r).getProto();
+    r = new AllocateResponsePBImpl(p);
+
+    // check value
+    Assert.assertEquals(aggregatorAddr, r.getAggregatorAddr());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
index 1a4dab8..d1e4acb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/pom.xml
@@ -142,6 +142,7 @@
                   <include>yarn_server_common_service_protos.proto</include>
                   <include>ResourceTracker.proto</include>
                   <include>SCMUploader.proto</include>
+                  <include>aggregatornodemanager_protocol.proto</include>
                 </includes>
               </source>
               <output>${project.build.directory}/generated-sources/java</output>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
new file mode 100644
index 0000000..53bdb4e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocol.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+
+/**
+ * <p>The protocol between an <code>TimelineAggregatorsCollection</code> and a 
+ * <code>NodeManager</code> to report a new application aggregator get launched.
+ * </p>
+ * 
+ */
+@Private
+public interface AggregatorNodemanagerProtocol {
+
+  /**
+   * 
+   * <p>
+   * The <code>TimelineAggregatorsCollection</code> provides a list of mapping
+   * between application and aggregator's address in 
+   * {@link ReportNewAggregatorsInfoRequest} to a <code>NodeManager</code> to
+   * <em>register</em> aggregator's info, include: applicationId and REST URI to 
+   * access aggregator. NodeManager will add them into registered aggregators 
+   * and register them into <code>ResourceManager</code> afterwards.
+   * </p>
+   * 
+   * @param request the request of registering a new aggregator or a list of aggregators
+   * @return 
+   * @throws YarnException
+   * @throws IOException
+   */
+  ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+      ReportNewAggregatorsInfoRequest request)
+      throws YarnException, IOException;
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
new file mode 100644
index 0000000..4df80a5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/AggregatorNodemanagerProtocolPB.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.ipc.ProtocolInfo;
+import org.apache.hadoop.yarn.proto.AggregatorNodemanagerProtocol.AggregatorNodemanagerProtocolService;
+
+@Private
+@Unstable
+@ProtocolInfo(
+    protocolName = "org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB",
+    protocolVersion = 1)
+public interface AggregatorNodemanagerProtocolPB extends 
+    AggregatorNodemanagerProtocolService.BlockingInterface {
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
new file mode 100644
index 0000000..6e777e7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/client/AggregatorNodemanagerProtocolPBClientImpl.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.impl.pb.client;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.ipc.RPCUtil;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
+
+import com.google.protobuf.ServiceException;
+
+public class AggregatorNodemanagerProtocolPBClientImpl implements
+    AggregatorNodemanagerProtocol, Closeable {
+
+  // Not a documented config. Only used for tests internally
+  static final String NM_COMMAND_TIMEOUT = YarnConfiguration.YARN_PREFIX
+      + "rpc.nm-command-timeout";
+
+  /**
+   * Maximum of 1 minute timeout for a Node to react to the command
+   */
+  static final int DEFAULT_COMMAND_TIMEOUT = 60000;
+  
+  private AggregatorNodemanagerProtocolPB proxy;
+  
+  @Private
+  public AggregatorNodemanagerProtocolPBClientImpl(long clientVersion,
+      InetSocketAddress addr, Configuration conf) throws IOException {
+    RPC.setProtocolEngine(conf, AggregatorNodemanagerProtocolPB.class,
+      ProtobufRpcEngine.class);
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+
+    int expireIntvl = conf.getInt(NM_COMMAND_TIMEOUT, DEFAULT_COMMAND_TIMEOUT);
+    proxy =
+        (AggregatorNodemanagerProtocolPB) RPC.getProxy(
+            AggregatorNodemanagerProtocolPB.class,
+            clientVersion, addr, ugi, conf,
+            NetUtils.getDefaultSocketFactory(conf), expireIntvl);
+  }
+  
+  @Override
+  public ReportNewAggregatorsInfoResponse reportNewAggregatorInfo(
+      ReportNewAggregatorsInfoRequest request) throws YarnException, IOException {
+  
+    ReportNewAggregatorsInfoRequestProto requestProto =
+        ((ReportNewAggregatorsInfoRequestPBImpl) request).getProto();
+    try {
+      return new ReportNewAggregatorsInfoResponsePBImpl(
+          proxy.reportNewAggregatorInfo(null, requestProto));
+    } catch (ServiceException e) {
+      RPCUtil.unwrapAndThrowException(e);
+      return null;
+    }
+  }
+  
+  @Override
+  public void close() {
+    if (this.proxy != null) {
+      RPC.stopProxy(this.proxy);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
new file mode 100644
index 0000000..87bce16
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/impl/pb/service/AggregatorNodemanagerProtocolPBServiceImpl.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.impl.pb.service;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoRequestProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewAggregatorsInfoResponseProto;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.AggregatorNodemanagerProtocolPB;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewAggregatorsInfoResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoRequestPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.ReportNewAggregatorsInfoResponsePBImpl;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+public class AggregatorNodemanagerProtocolPBServiceImpl implements
+    AggregatorNodemanagerProtocolPB {
+
+  private AggregatorNodemanagerProtocol real;
+  
+  public AggregatorNodemanagerProtocolPBServiceImpl(AggregatorNodemanagerProtocol impl) {
+    this.real = impl;
+  }
+
+  @Override
+  public ReportNewAggregatorsInfoResponseProto reportNewAggregatorInfo(
+      RpcController arg0, ReportNewAggregatorsInfoRequestProto proto) 
+      throws ServiceException {
+    ReportNewAggregatorsInfoRequestPBImpl request = 
+        new ReportNewAggregatorsInfoRequestPBImpl(proto);
+    try {
+      ReportNewAggregatorsInfoResponse response = real.reportNewAggregatorInfo(request);
+      return ((ReportNewAggregatorsInfoResponsePBImpl)response).getProto();
+    } catch (YarnException e) {
+      throw new ServiceException(e);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
index addd3fe..0b020b7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.util.Records;
@@ -36,6 +40,21 @@ public abstract class NodeHeartbeatRequest {
         .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
     return nodeHeartbeatRequest;
   }
+  
+  public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
+      MasterKey lastKnownContainerTokenMasterKey,
+      MasterKey lastKnownNMTokenMasterKey,
+      Map<ApplicationId, String> registeredAggregators) {
+    NodeHeartbeatRequest nodeHeartbeatRequest =
+        Records.newRecord(NodeHeartbeatRequest.class);
+    nodeHeartbeatRequest.setNodeStatus(nodeStatus);
+    nodeHeartbeatRequest
+        .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
+    nodeHeartbeatRequest
+        .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+    nodeHeartbeatRequest.setRegisteredAggregators(registeredAggregators);
+    return nodeHeartbeatRequest;
+  }
 
   public abstract NodeStatus getNodeStatus();
   public abstract void setNodeStatus(NodeStatus status);
@@ -45,4 +64,8 @@ public abstract class NodeHeartbeatRequest {
   
   public abstract MasterKey getLastKnownNMTokenMasterKey();
   public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
+  
+  // This tells RM registered aggregators' address info on this node
+  public abstract Map<ApplicationId, String> getRegisteredAggregators();
+  public abstract void setRegisteredAggregators(Map<ApplicationId, String> appAggregatorsMap);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 9fb44ca..262ca07 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -35,6 +35,10 @@ public interface NodeHeartbeatResponse {
   List<ContainerId> getContainersToBeRemovedFromNM();
 
   List<ApplicationId> getApplicationsToCleanup();
+  
+  // This tells NM the aggregators' address info of related Apps
+  Map<ApplicationId, String> getAppAggregatorsMap();
+  void setAppAggregatorsMap(Map<ApplicationId, String> appAggregatorsMap);
 
   void setResponseId(int responseId);
   void setNodeAction(NodeAction action);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
new file mode 100644
index 0000000..ae538a2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoRequest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import java.util.List;
+import java.util.Arrays;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.records.AppAggregatorsMap;
+import org.apache.hadoop.yarn.util.Records;
+
+@Private
+public abstract class ReportNewAggregatorsInfoRequest {
+  
+  public static ReportNewAggregatorsInfoRequest newInstance(
+      List<AppAggregatorsMap> appAggregatorsList) {
+    ReportNewAggregatorsInfoRequest request =
+        Records.newRecord(ReportNewAggregatorsInfoRequest.class);
+    request.setAppAggregatorsList(appAggregatorsList);
+    return request;
+  }
+  
+  public static ReportNewAggregatorsInfoRequest newInstance(
+      ApplicationId id, String aggregatorAddr) {
+    ReportNewAggregatorsInfoRequest request =
+        Records.newRecord(ReportNewAggregatorsInfoRequest.class);
+    request.setAppAggregatorsList(
+        Arrays.asList(AppAggregatorsMap.newInstance(id, aggregatorAddr)));
+    return request;
+  }
+  
+  public abstract List<AppAggregatorsMap> getAppAggregatorsList();
+  
+  public abstract void setAppAggregatorsList(
+      List<AppAggregatorsMap> appAggregatorsList);
+  
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
new file mode 100644
index 0000000..3b847d6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/ReportNewAggregatorsInfoResponse.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.util.Records;
+
+public abstract class ReportNewAggregatorsInfoResponse {
+
+  @Private
+  public static ReportNewAggregatorsInfoResponse newInstance() {
+    ReportNewAggregatorsInfoResponse response =
+        Records.newRecord(ReportNewAggregatorsInfoResponse.class);
+    return response;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
index 26d1f19..39eaabd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
@@ -18,8 +18,16 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -36,6 +44,7 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private NodeStatus nodeStatus = null;
   private MasterKey lastKnownContainerTokenMasterKey = null;
   private MasterKey lastKnownNMTokenMasterKey = null;
+  Map<ApplicationId, String> registeredAggregators = null;
   
   public NodeHeartbeatRequestPBImpl() {
     builder = NodeHeartbeatRequestProto.newBuilder();
@@ -80,6 +89,20 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       builder.setLastKnownNmTokenMasterKey(
           convertToProtoFormat(this.lastKnownNMTokenMasterKey));
     }
+    
+    if (this.registeredAggregators != null) {
+      addRegisteredAggregatorsToProto();
+    }
+  }
+  
+  private void addRegisteredAggregatorsToProto() {
+    maybeInitBuilder();
+    builder.clearRegisteredAggregators();
+    for (Map.Entry<ApplicationId, String> entry : registeredAggregators.entrySet()) {
+      builder.addRegisteredAggregators(AppAggregatorsMapProto.newBuilder()
+        .setAppId(convertToProtoFormat(entry.getKey()))
+        .setAppAggregatorAddr(entry.getValue()));
+    }
   }
 
   private void mergeLocalToProto() {
@@ -162,6 +185,36 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
       builder.clearLastKnownNmTokenMasterKey();
     this.lastKnownNMTokenMasterKey = masterKey;
   }
+  
+  @Override
+  public Map<ApplicationId, String> getRegisteredAggregators() {
+    if (this.registeredAggregators != null) {
+      return this.registeredAggregators;
+    }
+    initRegisteredAggregators();
+    return registeredAggregators;
+  }
+  
+  private void initRegisteredAggregators() {
+    NodeHeartbeatRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppAggregatorsMapProto> list = p.getRegisteredAggregatorsList();
+    this.registeredAggregators = new HashMap<ApplicationId, String> ();
+    for (AppAggregatorsMapProto c : list) {
+      ApplicationId appId = convertFromProtoFormat(c.getAppId());
+      this.registeredAggregators.put(appId, c.getAppAggregatorAddr());
+    }
+  }
+  
+  @Override
+  public void setRegisteredAggregators(
+      Map<ApplicationId, String> registeredAggregators) {
+    if (registeredAggregators == null || registeredAggregators.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.registeredAggregators = new HashMap<ApplicationId, String>();
+    this.registeredAggregators.putAll(registeredAggregators);
+  }
 
   private NodeStatusPBImpl convertFromProtoFormat(NodeStatusProto p) {
     return new NodeStatusPBImpl(p);
@@ -170,6 +223,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
   private NodeStatusProto convertToProtoFormat(NodeStatus t) {
     return ((NodeStatusPBImpl)t).getProto();
   }
+  
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+    return new ApplicationIdPBImpl(p);
+  }
+  
+  private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+    return ((ApplicationIdPBImpl) t).getProto();
+  }
 
   private MasterKeyPBImpl convertFromProtoFormat(MasterKeyProto p) {
     return new MasterKeyPBImpl(p);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8a637914/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 630a5bf..019b2ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppAggregatorsMapProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
@@ -55,6 +56,8 @@ public class NodeHeartbeatResponsePBImpl extends
   private List<ContainerId> containersToBeRemovedFromNM = null;
   private List<ApplicationId> applicationsToCleanup = null;
   private Map<ApplicationId, ByteBuffer> systemCredentials = null;
+  
+  Map<ApplicationId, String> appAggregatorsMap = null;
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
@@ -96,6 +99,10 @@ public class NodeHeartbeatResponsePBImpl extends
     if (this.systemCredentials != null) {
       addSystemCredentialsToProto();
     }
+    
+    if (this.appAggregatorsMap != null) {
+      addAppAggregatorsMapToProto();
+    }
   }
 
   private void addSystemCredentialsToProto() {
@@ -108,6 +115,16 @@ public class NodeHeartbeatResponsePBImpl extends
             entry.getValue().duplicate())));
     }
   }
+  
+  private void addAppAggregatorsMapToProto() {
+    maybeInitBuilder();
+    builder.clearAppAggregatorsMap();
+    for (Map.Entry<ApplicationId, String> entry : appAggregatorsMap.entrySet()) {
+      builder.addAppAggregatorsMap(AppAggregatorsMapProto.newBuilder()
+        .setAppId(convertToProtoFormat(entry.getKey()))
+        .setAppAggregatorAddr(entry.getValue()));
+    }
+  }
 
   private void mergeLocalToProto() {
     if (viaProto) 
@@ -417,6 +434,15 @@ public class NodeHeartbeatResponsePBImpl extends
     initSystemCredentials();
     return systemCredentials;
   }
+  
+  @Override
+  public Map<ApplicationId, String> getAppAggregatorsMap() {
+    if (this.appAggregatorsMap != null) {
+      return this.appAggregatorsMap;
+    }
+    initAppAggregatorsMap();
+    return appAggregatorsMap;
+  }
 
   private void initSystemCredentials() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
@@ -428,6 +454,16 @@ public class NodeHeartbeatResponsePBImpl extends
       this.systemCredentials.put(appId, byteBuffer);
     }
   }
+  
+  private void initAppAggregatorsMap() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<AppAggregatorsMapProto> list = p.getAppAggregatorsMapList();
+    this.appAggregatorsMap = new HashMap<ApplicationId, String> ();
+    for (AppAggregatorsMapProto c : list) {
+      ApplicationId appId = convertFromProtoFormat(c.getAppId());
+      this.appAggregatorsMap.put(appId, c.getAppAggregatorAddr());
+    }
+  }
 
   @Override
   public void setSystemCredentialsForApps(
@@ -439,6 +475,17 @@ public class NodeHeartbeatResponsePBImpl extends
     this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
     this.systemCredentials.putAll(systemCredentials);
   }
+  
+  @Override
+  public void setAppAggregatorsMap(
+      Map<ApplicationId, String> appAggregatorsMap) {
+    if (appAggregatorsMap == null || appAggregatorsMap.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.appAggregatorsMap = new HashMap<ApplicationId, String>();
+    this.appAggregatorsMap.putAll(appAggregatorsMap);
+  }
 
   @Override
   public long getNextHeartBeatInterval() {


Mime
View raw message