hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject [09/50] [abbrv] hadoop git commit: YARN-3333. Rename TimelineAggregator etc. to TimelineCollector. Contributed by Sangjin Lee
Date Thu, 13 Aug 2015 21:20:00 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
new file mode 100644
index 0000000..59ecef1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The top-level server for the per-node timeline collector manager. Currently
+ * it is defined as an auxiliary service to accommodate running within another
+ * daemon (e.g. node manager).
+ */
+@Private
+@Unstable
+public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
+  private static final Log LOG =
+      LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class);
+  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  private final TimelineCollectorManager collectorManager;
+
+  public PerNodeTimelineCollectorsAuxService() {
+    // use the same singleton
+    this(TimelineCollectorManager.getInstance());
+  }
+
+  @VisibleForTesting PerNodeTimelineCollectorsAuxService(
+      TimelineCollectorManager collectorsManager) {
+    super("timeline_collector");
+    this.collectorManager = collectorsManager;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    collectorManager.init(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    collectorManager.start();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    collectorManager.stop();
+    super.serviceStop();
+  }
+
+  // these methods can be used as the basis for future service methods if the
+  // per-node collector runs separate from the node manager
+  /**
+   * Creates and adds an app level collector for the specified application id.
+   * The collector is also initialized and started. If the service already
+   * exists, no new service is created.
+   *
+   * @return whether it was added successfully
+   */
+  public boolean addApplication(ApplicationId appId) {
+    AppLevelTimelineCollector collector =
+        new AppLevelTimelineCollector(appId.toString());
+    return (collectorManager.putIfAbsent(appId, collector)
+        == collector);
+  }
+
+  /**
+   * Removes the app level collector for the specified application id. The
+   * collector is also stopped as a result. If the collector does not exist, no
+   * change is made.
+   *
+   * @return whether it was removed successfully
+   */
+  public boolean removeApplication(ApplicationId appId) {
+    String appIdString = appId.toString();
+    return collectorManager.remove(appIdString);
+  }
+
+  /**
+   * Creates and adds an app level collector for the specified application id.
+   * The collector is also initialized and started. If the collector already
+   * exists, no new collector is created.
+   */
+  @Override
+  public void initializeContainer(ContainerInitializationContext context) {
+    // intercept the event of the AM container being created and initialize the
+    // app level collector service
+    if (isApplicationMaster(context)) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      addApplication(appId);
+    }
+  }
+
+  /**
+   * Removes the app level collector for the specified application id. The
+   * collector is also stopped as a result. If the collector does not exist, no
+   * change is made.
+   */
+  @Override
+  public void stopContainer(ContainerTerminationContext context) {
+    // intercept the event of the AM container being stopped and remove the app
+    // level collector service
+    if (isApplicationMaster(context)) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      removeApplication(appId);
+    }
+  }
+
+  private boolean isApplicationMaster(ContainerContext context) {
+    // TODO this is based on a (shaky) assumption that the container id (the
+    // last field of the full container id) for an AM is always 1
+    // we want to make this much more reliable
+    ContainerId containerId = context.getContainerId();
+    return containerId.getContainerId() == 1L;
+  }
+
+  @VisibleForTesting
+  boolean hasApplication(String appId) {
+    return collectorManager.containsKey(appId);
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext context) {
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext context) {
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    // TODO currently it is not used; we can return a more meaningful data when
+    // we connect it with an AM
+    return ByteBuffer.allocate(0);
+  }
+
+  @VisibleForTesting
+  public static PerNodeTimelineCollectorsAuxService
+      launchServer(String[] args, TimelineCollectorManager collectorManager) {
+    Thread
+      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    StringUtils.startupShutdownMessage(
+        PerNodeTimelineCollectorsAuxService.class, args, LOG);
+    PerNodeTimelineCollectorsAuxService auxService = null;
+    try {
+      auxService = collectorManager == null ?
+          new PerNodeTimelineCollectorsAuxService() :
+          new PerNodeTimelineCollectorsAuxService(collectorManager);
+      ShutdownHookManager.get().addShutdownHook(new ShutdownHook(auxService),
+          SHUTDOWN_HOOK_PRIORITY);
+      YarnConfiguration conf = new YarnConfiguration();
+      auxService.init(conf);
+      auxService.start();
+    } catch (Throwable t) {
+      LOG.fatal("Error starting PerNodeTimelineCollectorServer", t);
+      ExitUtil.terminate(-1, "Error starting PerNodeTimelineCollectorServer");
+    }
+    return auxService;
+  }
+
+  private static class ShutdownHook implements Runnable {
+    private final PerNodeTimelineCollectorsAuxService auxService;
+
+    public ShutdownHook(PerNodeTimelineCollectorsAuxService auxService) {
+      this.auxService = auxService;
+    }
+
+    public void run() {
+      auxService.stop();
+    }
+  }
+
+  public static void main(String[] args) {
+    launchServer(args, null);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
new file mode 100644
index 0000000..6e20e69
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can add their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public abstract class TimelineCollector extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(TimelineCollector.class);
+
+  private TimelineWriter writer;
+
+  public TimelineCollector(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    writer = ReflectionUtils.newInstance(conf.getClass(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class,
+        TimelineWriter.class), conf);
+    writer.init(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+    writer.stop();
+  }
+
+  public TimelineWriter getWriter() {
+    return writer;
+  }
+
+  /**
+   * Handles entity writes. These writes are synchronous and are written to the
+   * backing storage without buffering/batching. If any entity already exists,
+   * it results in an update of the entity.
+   *
+   * This method should be reserved for selected critical entities and events.
+   * For normal voluminous writes one should use the async method
+   * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   * @return the response that contains the result of the post.
+   */
+  public TimelineWriteResponse postEntities(TimelineEntities entities,
+      UserGroupInformation callerUgi) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
+      LOG.debug("postEntities(entities=" + entities + ", callerUgi="
+          + callerUgi + ")");
+    }
+
+    return writer.write(entities);
+  }
+
+  /**
+   * Handles entity writes in an asynchronous manner. The method returns as soon
+   * as validation is done. No promises are made on how quickly it will be
+   * written to the backing storage or if it will always be written to the
+   * backing storage. Multiple writes to the same entities may be batched and
+   * appropriate values updated and result in fewer writes to the backing
+   * storage.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   */
+  public void postEntitiesAsync(TimelineEntities entities,
+      UserGroupInformation callerUgi) {
+    // TODO implement
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+          callerUgi + ")");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
new file mode 100644
index 0000000..3691162
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Class that manages adding and removing collectors and their lifecycle. It
+ * provides thread safety access to the collectors inside.
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ */
+@Private
+@Unstable
+public class TimelineCollectorManager extends CompositeService {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineCollectorManager.class);
+  private static final TimelineCollectorManager INSTANCE =
+      new TimelineCollectorManager();
+
+  // access to this map is synchronized with the map itself
+  private final Map<String, TimelineCollector> collectors =
+      Collections.synchronizedMap(
+          new HashMap<String, TimelineCollector>());
+
+  // REST server for this collector manager
+  private HttpServer2 timelineRestServer;
+
+  private String timelineRestServerBindAddress;
+
+  private CollectorNodemanagerProtocol nmCollectorService;
+
+  private InetSocketAddress nmCollectorServiceAddress;
+
+  static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
+
+  static TimelineCollectorManager getInstance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  protected TimelineCollectorManager() {
+    super(TimelineCollectorManager.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    this.nmCollectorServiceAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    startWebApp();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (timelineRestServer != null) {
+      timelineRestServer.stop();
+    }
+    super.serviceStop();
+  }
+
+  /**
+   * Put the collector into the collection if an collector mapped by id does
+   * not exist.
+   *
+   * @throws YarnRuntimeException if there was any exception in initializing and
+   * starting the app level service
+   * @return the collector associated with id after the potential put.
+   */
+  public TimelineCollector putIfAbsent(ApplicationId appId,
+      TimelineCollector collector) {
+    String id = appId.toString();
+    TimelineCollector collectorInTable;
+    boolean collectorIsNew = false;
+    synchronized (collectors) {
+      collectorInTable = collectors.get(id);
+      if (collectorInTable == null) {
+        try {
+          // initialize, start, and add it to the collection so it can be
+          // cleaned up when the parent shuts down
+          collector.init(getConfig());
+          collector.start();
+          collectors.put(id, collector);
+          LOG.info("the collector for " + id + " was added");
+          collectorInTable = collector;
+          collectorIsNew = true;
+        } catch (Exception e) {
+          throw new YarnRuntimeException(e);
+        }
+      } else {
+        String msg = "the collector for " + id + " already exists!";
+        LOG.error(msg);
+      }
+
+    }
+    // Report to NM if a new collector is added.
+    if (collectorIsNew) {
+      try {
+        reportNewCollectorToNM(appId);
+      } catch (Exception e) {
+        // throw exception here as it cannot be used if failed report to NM
+        LOG.error("Failed to report a new collector for application: " + appId +
+            " to the NM Collector Service.");
+        throw new YarnRuntimeException(e);
+      }
+    }
+
+    return collectorInTable;
+  }
+
+  /**
+   * Removes the collector for the specified id. The collector is also stopped
+   * as a result. If the collector does not exist, no change is made.
+   *
+   * @return whether it was removed successfully
+   */
+  public boolean remove(String id) {
+    synchronized (collectors) {
+      TimelineCollector collector = collectors.remove(id);
+      if (collector == null) {
+        String msg = "the collector for " + id + " does not exist!";
+        LOG.error(msg);
+        return false;
+      } else {
+        // stop the service to do clean up
+        collector.stop();
+        LOG.info("the collector service for " + id + " was removed");
+        return true;
+      }
+    }
+  }
+
+  /**
+   * Returns the collector for the specified id.
+   *
+   * @return the collector or null if it does not exist
+   */
+  public TimelineCollector get(String id) {
+    return collectors.get(id);
+  }
+
+  /**
+   * Returns whether the collector for the specified id exists in this
+   * collection.
+   */
+  public boolean containsKey(String id) {
+    return collectors.containsKey(id);
+  }
+
+  /**
+   * Launch the REST web server for this collector manager
+   */
+  private void startWebApp() {
+    Configuration conf = getConfig();
+    // use the same ports as the old ATS for now; we could create new properties
+    // for the new timeline service if needed
+    String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+        YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+        WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
+    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
+        NetUtils.createSocketAddr(bindAddress));
+    LOG.info("Instantiating the per-node collector webapp at " +
+        timelineRestServerBindAddress);
+    try {
+      Configuration confForInfoServer = new Configuration(conf);
+      confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
+      HttpServer2.Builder builder = new HttpServer2.Builder()
+          .setName("timeline")
+          .setConf(conf)
+          .addEndpoint(URI.create("http://" + bindAddress));
+      timelineRestServer = builder.build();
+      // TODO: replace this by an authentication filter in future.
+      HashMap<String, String> options = new HashMap<>();
+      String username = conf.get(HADOOP_HTTP_STATIC_USER,
+          DEFAULT_HADOOP_HTTP_STATIC_USER);
+      options.put(HADOOP_HTTP_STATIC_USER, username);
+      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
+          "static_user_filter_timeline",
+          StaticUserWebFilter.StaticUserFilter.class.getName(),
+          options, new String[] {"/*"});
+
+      timelineRestServer.addJerseyResourcePackage(
+          TimelineCollectorWebService.class.getPackage().getName() + ";"
+              + GenericExceptionHandler.class.getPackage().getName() + ";"
+              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+          "/*");
+      timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY,
+          TimelineCollectorManager.getInstance());
+      timelineRestServer.start();
+    } catch (Exception e) {
+      String msg = "The per-node collector webapp failed to start.";
+      LOG.error(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+  }
+
+  private void reportNewCollectorToNM(ApplicationId appId)
+      throws YarnException, IOException {
+    this.nmCollectorService = getNMCollectorService();
+    ReportNewCollectorInfoRequest request =
+        ReportNewCollectorInfoRequest.newInstance(appId,
+            this.timelineRestServerBindAddress);
+    LOG.info("Report a new collector for application: " + appId +
+        " to the NM Collector Service.");
+    nmCollectorService.reportNewCollectorInfo(request);
+  }
+
+  @VisibleForTesting
+  protected CollectorNodemanagerProtocol getNMCollectorService() {
+    Configuration conf = getConfig();
+    final YarnRPC rpc = YarnRPC.create(conf);
+
+    // TODO Security settings.
+    return (CollectorNodemanagerProtocol) rpc.getProxy(
+        CollectorNodemanagerProtocol.class,
+        nmCollectorServiceAddress, conf);
+  }
+
+  @VisibleForTesting
+  public String getRestServerBindAddress() {
+    return timelineRestServerBindAddress;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
new file mode 100644
index 0000000..5adae71
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
@@ -0,0 +1,183 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import javax.servlet.ServletContext;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Singleton;
+
+/**
+ * The main per-node REST end point for timeline service writes. It is
+ * essentially a container service that routes requests to the appropriate
+ * per-app services.
+ */
+@Private
+@Unstable
+@Singleton
+@Path("/ws/v2/timeline")
+public class TimelineCollectorWebService {
+  private static final Log LOG =
+      LogFactory.getLog(TimelineCollectorWebService.class);
+
+  private @Context ServletContext context;
+
+  @XmlRootElement(name = "about")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class AboutInfo {
+
+    private String about;
+
+    public AboutInfo() {
+
+    }
+
+    public AboutInfo(String about) {
+      this.about = about;
+    }
+
+    @XmlElement(name = "About")
+    public String getAbout() {
+      return about;
+    }
+
+    public void setAbout(String about) {
+      this.about = about;
+    }
+
+  }
+
+  /**
+   * Return the description of the timeline web services.
+   */
+  @GET
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public AboutInfo about(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res) {
+    init(res);
+    return new AboutInfo("Timeline Collector API");
+  }
+
+  /**
+   * Accepts writes to the collector, and returns a response. It simply routes
+   * the request to the app level collector. It expects an application as a
+   * context.
+   */
+  @PUT
+  @Path("/entities")
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public Response putEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @QueryParam("async") String async,
+      @QueryParam("appid") String appId,
+      TimelineEntities entities) {
+    init(res);
+    UserGroupInformation callerUgi = getUser(req);
+    if (callerUgi == null) {
+      String msg = "The owner of the posted timeline entities is not set";
+      LOG.error(msg);
+      throw new ForbiddenException(msg);
+    }
+
+    // TODO how to express async posts and handle them
+    boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+
+    try {
+      appId = parseApplicationId(appId);
+      if (appId == null) {
+        return Response.status(Response.Status.BAD_REQUEST).build();
+      }
+      TimelineCollector collector = getCollector(req, appId);
+      if (collector == null) {
+        LOG.error("Application not found");
+        throw new NotFoundException(); // different exception?
+      }
+      collector.postEntities(entities, callerUgi);
+      return Response.ok().build();
+    } catch (Exception e) {
+      LOG.error("Error putting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private String parseApplicationId(String appId) {
+    try {
+      if (appId != null) {
+        return ConverterUtils.toApplicationId(appId.trim()).toString();
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  private TimelineCollector
+      getCollector(HttpServletRequest req, String appIdToParse) {
+    String appIdString = parseApplicationId(appIdToParse);
+    final TimelineCollectorManager collectorManager =
+        (TimelineCollectorManager) context.getAttribute(
+            TimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
+    return collectorManager.get(appIdString);
+  }
+
+  private void init(HttpServletResponse response) {
+    response.setContentType(null);
+  }
+
+  private UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUgi = null;
+    if (remoteUser != null) {
+      callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUgi;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
index 4a57e97..f5603f6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java
@@ -65,7 +65,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
    * Stores the entire information in {@link TimelineEntity} to the
    * timeline store. Any errors occurring for individual write request objects
    * will be reported in the response.
-   * 
+   *
    * @param data
    *          a {@link TimelineEntity} object
    * @return {@link TimelineWriteResponse} object.
@@ -116,10 +116,10 @@ public class FileSystemTimelineWriterImpl extends AbstractService
    * Aggregates the entity information to the timeline store based on which
    * track this entity is to be rolled up to The tracks along which aggregations
    * are to be done are given by {@link TimelineAggregationTrack}
-   * 
+   *
    * Any errors occurring for individual write request objects will be reported
    * in the response.
-   * 
+   *
    * @param data
    *          a {@link TimelineEntity} object
    *          a {@link TimelineAggregationTrack} enum value

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
deleted file mode 100644
index 8f95814..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelTimelineAggregator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-
-public class TestAppLevelTimelineAggregator {
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
deleted file mode 100644
index 1c89ead..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeTimelineAggregatorsAuxService.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.ExitUtil;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
-import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.junit.Test;
-
-public class TestPerNodeTimelineAggregatorsAuxService {
-  private ApplicationAttemptId appAttemptId;
-
-  public TestPerNodeTimelineAggregatorsAuxService() {
-    ApplicationId appId =
-        ApplicationId.newInstance(System.currentTimeMillis(), 1);
-    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-  }
-
-  @Test
-  public void testAddApplication() throws Exception {
-    PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
-    // auxService should have a single app
-    assertTrue(auxService.hasApplication(
-        appAttemptId.getApplicationId().toString()));
-    auxService.close();
-  }
-
-  @Test
-  public void testAddApplicationNonAMContainer() throws Exception {
-    PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
-
-    ContainerId containerId = getContainerId(2L); // not an AM
-    ContainerInitializationContext context =
-        mock(ContainerInitializationContext.class);
-    when(context.getContainerId()).thenReturn(containerId);
-    auxService.initializeContainer(context);
-    // auxService should not have that app
-    assertFalse(auxService.hasApplication(
-        appAttemptId.getApplicationId().toString()));
-  }
-
-  @Test
-  public void testRemoveApplication() throws Exception {
-    PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
-    // auxService should have a single app
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    assertTrue(auxService.hasApplication(appIdStr));
-
-    ContainerId containerId = getAMContainerId();
-    ContainerTerminationContext context =
-        mock(ContainerTerminationContext.class);
-    when(context.getContainerId()).thenReturn(containerId);
-    auxService.stopContainer(context);
-    // auxService should not have that app
-    assertFalse(auxService.hasApplication(appIdStr));
-    auxService.close();
-  }
-
-  @Test
-  public void testRemoveApplicationNonAMContainer() throws Exception {
-    PerNodeTimelineAggregatorsAuxService auxService = createAggregatorAndAddApplication();
-    // auxService should have a single app
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    assertTrue(auxService.hasApplication(appIdStr));
-
-    ContainerId containerId = getContainerId(2L); // not an AM
-    ContainerTerminationContext context =
-        mock(ContainerTerminationContext.class);
-    when(context.getContainerId()).thenReturn(containerId);
-    auxService.stopContainer(context);
-    // auxService should still have that app
-    assertTrue(auxService.hasApplication(appIdStr));
-    auxService.close();
-  }
-
-  @Test(timeout = 60000)
-  public void testLaunch() throws Exception {
-    ExitUtil.disableSystemExit();
-    PerNodeTimelineAggregatorsAuxService auxService = null;
-    try {
-      auxService =
-          PerNodeTimelineAggregatorsAuxService.launchServer(new String[0]);
-    } catch (ExitUtil.ExitException e) {
-      assertEquals(0, e.status);
-      ExitUtil.resetFirstExitException();
-      fail();
-    } finally {
-      if (auxService != null) {
-        auxService.stop();
-      }
-    }
-  }
-
-  private PerNodeTimelineAggregatorsAuxService createAggregatorAndAddApplication() {
-    PerNodeTimelineAggregatorsAuxService auxService = createAggregator();
-    // create an AM container
-    ContainerId containerId = getAMContainerId();
-    ContainerInitializationContext context =
-        mock(ContainerInitializationContext.class);
-    when(context.getContainerId()).thenReturn(containerId);
-    auxService.initializeContainer(context);
-    return auxService;
-  }
-
-  private PerNodeTimelineAggregatorsAuxService createAggregator() {
-    TimelineAggregatorsCollection
-        aggregatorsCollection = spy(new TimelineAggregatorsCollection());
-    doReturn(new Configuration()).when(aggregatorsCollection).getConfig();
-    PerNodeTimelineAggregatorsAuxService auxService =
-        spy(new PerNodeTimelineAggregatorsAuxService(aggregatorsCollection));
-    return auxService;
-  }
-
-  private ContainerId getAMContainerId() {
-    return getContainerId(1L);
-  }
-
-  private ContainerId getContainerId(long id) {
-    return ContainerId.newContainerId(appAttemptId, id);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
deleted file mode 100644
index dd64629..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestTimelineAggregatorsCollection.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.timelineservice.aggregator;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.spy;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-
-import com.sun.jersey.core.impl.provider.entity.XMLJAXBElementProvider;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.junit.Test;
-
-public class TestTimelineAggregatorsCollection {
-
-  @Test(timeout=60000)
-  public void testMultithreadedAdd() throws Exception {
-    final TimelineAggregatorsCollection aggregatorCollection =
-        spy(new TimelineAggregatorsCollection());
-    doReturn(new Configuration()).when(aggregatorCollection).getConfig();
-
-    final int NUM_APPS = 5;
-    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
-    for (int i = 0; i < NUM_APPS; i++) {
-      final ApplicationId appId = ApplicationId.newInstance(0L, i);
-      Callable<Boolean> task = new Callable<Boolean>() {
-        public Boolean call() {
-          AppLevelTimelineAggregator aggregator =
-              new AppLevelTimelineAggregator(appId.toString());
-          return (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
-        }
-      };
-      tasks.add(task);
-    }
-    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
-    try {
-      List<Future<Boolean>> futures = executor.invokeAll(tasks);
-      for (Future<Boolean> future: futures) {
-        assertTrue(future.get());
-      }
-    } finally {
-      executor.shutdownNow();
-    }
-    // check the keys
-    for (int i = 0; i < NUM_APPS; i++) {
-      assertTrue(aggregatorCollection.containsKey(String.valueOf(i)));
-    }
-  }
-
-  @Test
-  public void testMultithreadedAddAndRemove() throws Exception {
-    final TimelineAggregatorsCollection aggregatorCollection =
-        spy(new TimelineAggregatorsCollection());
-    doReturn(new Configuration()).when(aggregatorCollection).getConfig();
-
-    final int NUM_APPS = 5;
-    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
-    for (int i = 0; i < NUM_APPS; i++) {
-      final ApplicationId appId = ApplicationId.newInstance(0L, i);
-      Callable<Boolean> task = new Callable<Boolean>() {
-        public Boolean call() {
-          AppLevelTimelineAggregator aggregator =
-              new AppLevelTimelineAggregator(appId.toString());
-          boolean successPut =
-              (aggregatorCollection.putIfAbsent(appId, aggregator) == aggregator);
-          return successPut && aggregatorCollection.remove(appId.toString());
-        }
-      };
-      tasks.add(task);
-    }
-    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
-    try {
-      List<Future<Boolean>> futures = executor.invokeAll(tasks);
-      for (Future<Boolean> future: futures) {
-        assertTrue(future.get());
-      }
-    } finally {
-      executor.shutdownNow();
-    }
-    // check the keys
-    for (int i = 0; i < NUM_APPS; i++) {
-      assertFalse(aggregatorCollection.containsKey(String.valueOf(i)));
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java
new file mode 100644
index 0000000..74c81a7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestAppLevelTimelineCollector.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+
+public class TestAppLevelTimelineCollector {
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
new file mode 100644
index 0000000..3b20352
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.junit.Test;
+
+public class TestPerNodeTimelineCollectorsAuxService {
+  private ApplicationAttemptId appAttemptId;
+
+  public TestPerNodeTimelineCollectorsAuxService() {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+  }
+
+  @Test
+  public void testAddApplication() throws Exception {
+    PerNodeTimelineCollectorsAuxService auxService =
+        createCollectorAndAddApplication();
+    // auxService should have a single app
+    assertTrue(auxService.hasApplication(
+        appAttemptId.getApplicationId().toString()));
+    auxService.close();
+  }
+
+  @Test
+  public void testAddApplicationNonAMContainer() throws Exception {
+    PerNodeTimelineCollectorsAuxService auxService = createCollector();
+
+    ContainerId containerId = getContainerId(2L); // not an AM
+    ContainerInitializationContext context =
+        mock(ContainerInitializationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    auxService.initializeContainer(context);
+    // auxService should not have that app
+    assertFalse(auxService.hasApplication(
+        appAttemptId.getApplicationId().toString()));
+  }
+
+  @Test
+  public void testRemoveApplication() throws Exception {
+    PerNodeTimelineCollectorsAuxService auxService =
+        createCollectorAndAddApplication();
+    // auxService should have a single app
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    assertTrue(auxService.hasApplication(appIdStr));
+
+    ContainerId containerId = getAMContainerId();
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    auxService.stopContainer(context);
+    // auxService should not have that app
+    assertFalse(auxService.hasApplication(appIdStr));
+    auxService.close();
+  }
+
+  @Test
+  public void testRemoveApplicationNonAMContainer() throws Exception {
+    PerNodeTimelineCollectorsAuxService auxService =
+        createCollectorAndAddApplication();
+    // auxService should have a single app
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    assertTrue(auxService.hasApplication(appIdStr));
+
+    ContainerId containerId = getContainerId(2L); // not an AM
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    auxService.stopContainer(context);
+    // auxService should still have that app
+    assertTrue(auxService.hasApplication(appIdStr));
+    auxService.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testLaunch() throws Exception {
+    ExitUtil.disableSystemExit();
+    PerNodeTimelineCollectorsAuxService auxService = null;
+    try {
+      auxService =
+          PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
+              createCollectorManager());
+    } catch (ExitUtil.ExitException e) {
+      assertEquals(0, e.status);
+      ExitUtil.resetFirstExitException();
+      fail();
+    } finally {
+      if (auxService != null) {
+        auxService.stop();
+      }
+    }
+  }
+
+  private PerNodeTimelineCollectorsAuxService
+      createCollectorAndAddApplication() {
+    PerNodeTimelineCollectorsAuxService auxService = createCollector();
+    // create an AM container
+    ContainerId containerId = getAMContainerId();
+    ContainerInitializationContext context =
+        mock(ContainerInitializationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    auxService.initializeContainer(context);
+    return auxService;
+  }
+
+  private PerNodeTimelineCollectorsAuxService createCollector() {
+    TimelineCollectorManager collectorManager = createCollectorManager();
+    PerNodeTimelineCollectorsAuxService auxService =
+        spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
+    return auxService;
+  }
+
+  private TimelineCollectorManager createCollectorManager() {
+    TimelineCollectorManager collectorManager =
+        spy(new TimelineCollectorManager());
+    doReturn(new Configuration()).when(collectorManager).getConfig();
+    CollectorNodemanagerProtocol nmCollectorService =
+        mock(CollectorNodemanagerProtocol.class);
+    doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
+    return collectorManager;
+  }
+
+  private ContainerId getAMContainerId() {
+    return getContainerId(1L);
+  }
+
+  private ContainerId getContainerId(long id) {
+    return ContainerId.newContainerId(appAttemptId, id);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
new file mode 100644
index 0000000..541665b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.junit.Test;
+
+public class TestTimelineCollectorManager {
+
+  @Test(timeout=60000)
+  public void testMultithreadedAdd() throws Exception {
+    final TimelineCollectorManager collectorManager = createCollectorManager();
+
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          AppLevelTimelineCollector collector =
+              new AppLevelTimelineCollector(appId.toString());
+          return (collectorManager.putIfAbsent(appId, collector) == collector);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
+      assertTrue(collectorManager.containsKey(appId.toString()));
+    }
+  }
+
+  @Test
+  public void testMultithreadedAddAndRemove() throws Exception {
+    final TimelineCollectorManager collectorManager = createCollectorManager();
+
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          AppLevelTimelineCollector collector =
+              new AppLevelTimelineCollector(appId.toString());
+          boolean successPut =
+              (collectorManager.putIfAbsent(appId, collector) == collector);
+          return successPut && collectorManager.remove(appId.toString());
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      final ApplicationId appId = ApplicationId.newInstance(0L, i);
+      assertFalse(collectorManager.containsKey(appId.toString()));
+    }
+  }
+
+  private TimelineCollectorManager createCollectorManager() {
+    final TimelineCollectorManager collectorManager =
+        spy(new TimelineCollectorManager());
+    doReturn(new Configuration()).when(collectorManager).getConfig();
+    CollectorNodemanagerProtocol nmCollectorService =
+        mock(CollectorNodemanagerProtocol.class);
+    doReturn(nmCollectorService).when(collectorManager).getNMCollectorService();
+    return collectorManager;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/187b4425/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
index f720454..7f919f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java
@@ -27,12 +27,12 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.List;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.junit.Test;
-import org.apache.commons.io.FileUtils;
 
 public class TestFileSystemTimelineWriterImpl {
 
@@ -42,9 +42,6 @@ public class TestFileSystemTimelineWriterImpl {
    */
   @Test
   public void testWriteEntityToFile() throws Exception {
-    String name =  "unit_test_BaseAggregator_testWriteEntityToFile_"
-        + Long.toString(System.currentTimeMillis());
-
     TimelineEntities te = new TimelineEntities();
     TimelineEntity entity = new TimelineEntity();
     String id = "hello";
@@ -55,25 +52,27 @@ public class TestFileSystemTimelineWriterImpl {
     entity.setModifiedTime(1425016502000L);
     te.addEntity(entity);
 
-    FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl();
-    fsi.serviceInit(new Configuration());
-    fsi.write(te);
+    try (FileSystemTimelineWriterImpl fsi =
+        new FileSystemTimelineWriterImpl()) {
+      fsi.serviceInit(new Configuration());
+      fsi.write(te);
 
-    String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
-        + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-    Path path = Paths.get(fileName);
-    File f = new File(fileName);
-    assertTrue(f.exists() && !f.isDirectory());
-    List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
-    // ensure there's only one entity + 1 new line
-    assertTrue(data.size() == 2);
-    String d = data.get(0);
-    // confirm the contents same as what was written
-    assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
+          + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
+      Path path = Paths.get(fileName);
+      File f = new File(fileName);
+      assertTrue(f.exists() && !f.isDirectory());
+      List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
+      // ensure there's only one entity + 1 new line
+      assertTrue(data.size() == 2);
+      String d = data.get(0);
+      // confirm the contents same as what was written
+      assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
 
-    // delete the directory
-    File outputDir = new File(fsi.getOutputRoot());
-    FileUtils.deleteDirectory(outputDir);
-    assertTrue(!(f.exists()));
+      // delete the directory
+      File outputDir = new File(fsi.getOutputRoot());
+      FileUtils.deleteDirectory(outputDir);
+      assertTrue(!(f.exists()));
+    }
   }
 }


Mime
View raw message