hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From varunsax...@apache.org
Subject [33/50] [abbrv] hadoop git commit: YARN-5355. Backported YARN-2928 into our branch-2 feature branch.
Date Sun, 06 Nov 2016 16:31:50 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
new file mode 100644
index 0000000..9c0a983
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntity.java
@@ -0,0 +1,584 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+import java.util.TreeSet;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+import org.codehaus.jackson.annotate.JsonSetter;
+
+/**
+ * The basic timeline entity data structure for timeline service v2. Timeline
+ * entity objects are not thread safe and should not be accessed concurrently.
+ * All collection members will be initialized into empty collections. Two
+ * timeline entities are equal iff. their type and id are identical.
+ *
+ * All non-primitive type, non-collection members will be initialized into null.
+ * User should set the type and id of a timeline entity to make it valid (can be
+ * checked by using the {@link #isValid()} method). Callers to the getters
+ * should perform null checks for non-primitive type, non-collection members.
+ *
+ * Callers are recommended not to alter the returned collection objects from the
+ * getters.
+ */
+@XmlRootElement(name = "entity")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineEntity implements Comparable<TimelineEntity> {
+  protected final static String SYSTEM_INFO_KEY_PREFIX = "SYSTEM_INFO_";
+
+  /**
+   * Identifier of timeline entity(entity id + entity type).
+   */
+  @XmlRootElement(name = "identifier")
+  @XmlAccessorType(XmlAccessType.NONE)
+  public static class Identifier {
+    private String type;
+    private String id;
+
+    public Identifier(String type, String id) {
+      this.type = type;
+      this.id = id;
+    }
+
+    public Identifier() {
+
+    }
+
+    @XmlElement(name = "type")
+    public String getType() {
+      return type;
+    }
+
+    public void setType(String entityType) {
+      this.type = entityType;
+    }
+
+    @XmlElement(name = "id")
+    public String getId() {
+      return id;
+    }
+
+    public void setId(String entityId) {
+      this.id = entityId;
+    }
+
+    @Override
+    public String toString() {
+      return "TimelineEntity[" +
+          "type='" + type + '\'' +
+          ", id='" + id + '\'' + "]";
+    }
+
+    @Override
+    public int hashCode() {
+      final int prime = 31;
+      int result = 1;
+      result = prime * result + ((id == null) ? 0 : id.hashCode());
+      result =
+        prime * result + ((type == null) ? 0 : type.hashCode());
+      return result;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+      if (!(obj instanceof Identifier)) {
+        return false;
+      }
+      Identifier other = (Identifier) obj;
+      if (id == null) {
+        if (other.getId() != null) {
+          return false;
+        }
+      } else if (!id.equals(other.getId())) {
+        return false;
+      }
+      if (type == null) {
+        if (other.getType() != null) {
+          return false;
+        }
+      } else if (!type.equals(other.getType())) {
+        return false;
+      }
+      return true;
+    }
+  }
+
+  private TimelineEntity real;
+  private Identifier identifier;
+  private HashMap<String, Object> info = new HashMap<>();
+  private HashMap<String, String> configs = new HashMap<>();
+  private Set<TimelineMetric> metrics = new HashSet<>();
+  // events should be sorted by timestamp in descending order
+  private NavigableSet<TimelineEvent> events = new TreeSet<>();
+  private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
+  private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
+  private Long createdTime;
+
+  public TimelineEntity() {
+    identifier = new Identifier();
+  }
+
+  /**
+   * <p>
+   * The constuctor is used to construct a proxy {@link TimelineEntity} or its
+   * subclass object from the real entity object that carries information.
+   * </p>
+   *
+   * <p>
+   * It is usually used in the case where we want to recover class polymorphism
+   * after deserializing the entity from its JSON form.
+   * </p>
+   * @param entity the real entity that carries information
+   */
+  public TimelineEntity(TimelineEntity entity) {
+    real = entity.getReal();
+  }
+
+  protected TimelineEntity(String type) {
+    this();
+    identifier.type = type;
+  }
+
+  @XmlElement(name = "type")
+  public String getType() {
+    if (real == null) {
+      return identifier.type;
+    } else {
+      return real.getType();
+    }
+  }
+
+  public void setType(String type) {
+    if (real == null) {
+      identifier.type = type;
+    } else {
+      real.setType(type);
+    }
+  }
+
+  @XmlElement(name = "id")
+  public String getId() {
+    if (real == null) {
+      return identifier.id;
+    } else {
+      return real.getId();
+    }
+  }
+
+  public void setId(String id) {
+    if (real == null) {
+      identifier.id = id;
+    } else {
+      real.setId(id);
+    }
+  }
+
+  public Identifier getIdentifier() {
+    if (real == null) {
+      return identifier;
+    } else {
+      return real.getIdentifier();
+    }
+  }
+
+  public void setIdentifier(Identifier entityIdentifier) {
+    if (real == null) {
+      this.identifier = entityIdentifier;
+    } else {
+      real.setIdentifier(entityIdentifier);
+    }
+  }
+
+  // required by JAXB
+  @InterfaceAudience.Private
+  @XmlElement(name = "info")
+  public HashMap<String, Object> getInfoJAXB() {
+    if (real == null) {
+      return info;
+    } else {
+      return real.getInfoJAXB();
+    }
+  }
+
+  public Map<String, Object> getInfo() {
+    if (real == null) {
+      return info;
+    } else {
+      return real.getInfo();
+    }
+  }
+
+  public void setInfo(Map<String, Object> entityInfos) {
+    if (real == null) {
+      this.info = TimelineServiceHelper.mapCastToHashMap(entityInfos);
+    } else {
+      real.setInfo(entityInfos);
+    }
+  }
+
+  public void addInfo(Map<String, Object> entityInfos) {
+    if (real == null) {
+      this.info.putAll(entityInfos);
+    } else {
+      real.addInfo(entityInfos);
+    }
+  }
+
+  public void addInfo(String key, Object value) {
+    if (real == null) {
+      info.put(key, value);
+    } else {
+      real.addInfo(key, value);
+    }
+  }
+
+  // required by JAXB
+  @InterfaceAudience.Private
+  @XmlElement(name = "configs")
+  public HashMap<String, String> getConfigsJAXB() {
+    if (real == null) {
+      return configs;
+    } else {
+      return real.getConfigsJAXB();
+    }
+  }
+
+  public Map<String, String> getConfigs() {
+    if (real == null) {
+      return configs;
+    } else {
+      return real.getConfigs();
+    }
+  }
+
+  public void setConfigs(Map<String, String> entityConfigs) {
+    if (real == null) {
+      this.configs = TimelineServiceHelper.mapCastToHashMap(entityConfigs);
+    } else {
+      real.setConfigs(entityConfigs);
+    }
+  }
+
+  public void addConfigs(Map<String, String> entityConfigs) {
+    if (real == null) {
+      this.configs.putAll(entityConfigs);
+    } else {
+      real.addConfigs(entityConfigs);
+    }
+  }
+
+  public void addConfig(String key, String value) {
+    if (real == null) {
+      configs.put(key, value);
+    } else {
+      real.addConfig(key, value);
+    }
+  }
+
+  @XmlElement(name = "metrics")
+  public Set<TimelineMetric> getMetrics() {
+    if (real == null) {
+      return metrics;
+    } else {
+      return real.getMetrics();
+    }
+  }
+
+  public void setMetrics(Set<TimelineMetric> entityMetrics) {
+    if (real == null) {
+      this.metrics = entityMetrics;
+    } else {
+      real.setMetrics(entityMetrics);
+    }
+  }
+
+  public void addMetrics(Set<TimelineMetric> entityMetrics) {
+    if (real == null) {
+      this.metrics.addAll(entityMetrics);
+    } else {
+      real.addMetrics(entityMetrics);
+    }
+  }
+
+  public void addMetric(TimelineMetric metric) {
+    if (real == null) {
+      metrics.add(metric);
+    } else {
+      real.addMetric(metric);
+    }
+  }
+
+  @XmlElement(name = "events")
+  public NavigableSet<TimelineEvent> getEvents() {
+    if (real == null) {
+      return events;
+    } else {
+      return real.getEvents();
+    }
+  }
+
+  public void setEvents(NavigableSet<TimelineEvent> entityEvents) {
+    if (real == null) {
+      this.events = entityEvents;
+    } else {
+      real.setEvents(entityEvents);
+    }
+  }
+
+  public void addEvents(Set<TimelineEvent> entityEvents) {
+    if (real == null) {
+      this.events.addAll(entityEvents);
+    } else {
+      real.addEvents(entityEvents);
+    }
+  }
+
+  public void addEvent(TimelineEvent event) {
+    if (real == null) {
+      events.add(event);
+    } else {
+      real.addEvent(event);
+    }
+  }
+
+  public Map<String, Set<String>> getIsRelatedToEntities() {
+    if (real == null) {
+      return isRelatedToEntities;
+    } else {
+      return real.getIsRelatedToEntities();
+    }
+  }
+
+  // required by JAXB
+  @InterfaceAudience.Private
+  @XmlElement(name = "isrelatedto")
+  public HashMap<String, Set<String>> getIsRelatedToEntitiesJAXB() {
+    if (real == null) {
+      return isRelatedToEntities;
+    } else {
+      return real.getIsRelatedToEntitiesJAXB();
+    }
+  }
+
+  @JsonSetter("isrelatedto")
+  public void setIsRelatedToEntities(
+      Map<String, Set<String>> isRelatedTo) {
+    if (real == null) {
+      this.isRelatedToEntities =
+          TimelineServiceHelper.mapCastToHashMap(isRelatedTo);
+    } else {
+      real.setIsRelatedToEntities(isRelatedTo);
+    }
+  }
+
+  public void addIsRelatedToEntities(
+      Map<String, Set<String>> isRelatedTo) {
+    if (real == null) {
+      for (Map.Entry<String, Set<String>> entry : isRelatedTo.entrySet()) {
+        Set<String> ids = this.isRelatedToEntities.get(entry.getKey());
+        if (ids == null) {
+          ids = new HashSet<>();
+          this.isRelatedToEntities.put(entry.getKey(), ids);
+        }
+        ids.addAll(entry.getValue());
+      }
+    } else {
+      real.addIsRelatedToEntities(isRelatedTo);
+    }
+  }
+
+  public void addIsRelatedToEntity(String type, String id) {
+    if (real == null) {
+      Set<String> ids = isRelatedToEntities.get(type);
+      if (ids == null) {
+        ids = new HashSet<>();
+        isRelatedToEntities.put(type, ids);
+      }
+      ids.add(id);
+    } else {
+      real.addIsRelatedToEntity(type, id);
+    }
+  }
+
+  // required by JAXB
+  @InterfaceAudience.Private
+  @XmlElement(name = "relatesto")
+  public HashMap<String, Set<String>> getRelatesToEntitiesJAXB() {
+    if (real == null) {
+      return relatesToEntities;
+    } else {
+      return real.getRelatesToEntitiesJAXB();
+    }
+  }
+
+  public Map<String, Set<String>> getRelatesToEntities() {
+    if (real == null) {
+      return relatesToEntities;
+    } else {
+      return real.getRelatesToEntities();
+    }
+  }
+
+  public void addRelatesToEntities(Map<String, Set<String>> relatesTo) {
+    if (real == null) {
+      for (Map.Entry<String, Set<String>> entry : relatesTo.entrySet()) {
+        Set<String> ids = this.relatesToEntities.get(entry.getKey());
+        if (ids == null) {
+          ids = new HashSet<>();
+          this.relatesToEntities.put(entry.getKey(), ids);
+        }
+        ids.addAll(entry.getValue());
+      }
+    } else {
+      real.addRelatesToEntities(relatesTo);
+    }
+  }
+
+  public void addRelatesToEntity(String type, String id) {
+    if (real == null) {
+      Set<String> ids = relatesToEntities.get(type);
+      if (ids == null) {
+        ids = new HashSet<>();
+        relatesToEntities.put(type, ids);
+      }
+      ids.add(id);
+    } else {
+      real.addRelatesToEntity(type, id);
+    }
+  }
+
+  @JsonSetter("relatesto")
+  public void setRelatesToEntities(Map<String, Set<String>> relatesTo) {
+    if (real == null) {
+      this.relatesToEntities =
+          TimelineServiceHelper.mapCastToHashMap(relatesTo);
+    } else {
+      real.setRelatesToEntities(relatesTo);
+    }
+  }
+
+  @XmlElement(name = "createdtime")
+  public Long getCreatedTime() {
+    if (real == null) {
+      return createdTime;
+    } else {
+      return real.getCreatedTime();
+    }
+  }
+
+  @JsonSetter("createdtime")
+  public void setCreatedTime(Long createdTs) {
+    if (real == null) {
+      this.createdTime = createdTs;
+    } else {
+      real.setCreatedTime(createdTs);
+    }
+  }
+
+  /**
+   * Set UID in info which will be then used for query by UI.
+   * @param uidKey key for UID in info.
+   * @param uId UID to be set for the key.
+   */
+  public void setUID(String uidKey, String uId) {
+    if (real == null) {
+      info.put(uidKey, uId);
+    } else {
+      real.addInfo(uidKey, uId);
+    }
+  }
+
+  public boolean isValid() {
+    return (getId() != null && getType() != null);
+  }
+
+  // When get hashCode for a timeline entity, or check if two timeline entities
+  // are equal, we only compare their identifiers (id and type)
+  @Override
+  public int hashCode() {
+    return getIdentifier().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof TimelineEntity)) {
+      return false;
+    }
+    TimelineEntity other = (TimelineEntity) obj;
+    return getIdentifier().equals(other.getIdentifier());
+  }
+
+  @Override
+  public int compareTo(TimelineEntity other) {
+    int comparison = getType().compareTo(other.getType());
+    if (comparison == 0) {
+      if (getCreatedTime() == null) {
+        if (other.getCreatedTime() == null) {
+          return getId().compareTo(other.getId());
+        } else {
+          return 1;
+        }
+      }
+      if (other.getCreatedTime() == null) {
+        return -1;
+      }
+      if (getCreatedTime() > other.getCreatedTime()) {
+        // Order by created time desc
+        return -1;
+      } else if (getCreatedTime() < other.getCreatedTime()) {
+        return 1;
+      } else {
+        return getId().compareTo(other.getId());
+      }
+    } else {
+      return comparison;
+    }
+  }
+
+  protected TimelineEntity getReal() {
+    return real == null ? this : real;
+  }
+
+  public String toString() {
+    if (real == null) {
+      return identifier.toString();
+    } else {
+      return real.toString();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
new file mode 100644
index 0000000..8fcc2ae
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntityType.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Defines type of entity.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public enum TimelineEntityType {
+  YARN_CLUSTER,
+  YARN_FLOW_RUN,
+  YARN_APPLICATION,
+  YARN_APPLICATION_ATTEMPT,
+  YARN_CONTAINER,
+  YARN_USER,
+  YARN_QUEUE,
+  YARN_FLOW_ACTIVITY;
+
+  /**
+   * Whether the input type can be a parent of this entity.
+   *
+   * @param type entity type.
+   * @return true, if this entity type is parent of passed entity type, false
+   *     otherwise.
+   */
+  public boolean isParent(TimelineEntityType type) {
+    switch (this) {
+    case YARN_CLUSTER:
+      return false;
+    case YARN_FLOW_RUN:
+      return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
+    case YARN_APPLICATION:
+      return YARN_FLOW_RUN == type || YARN_CLUSTER == type;
+    case YARN_APPLICATION_ATTEMPT:
+      return YARN_APPLICATION == type;
+    case YARN_CONTAINER:
+      return YARN_APPLICATION_ATTEMPT == type;
+    case YARN_QUEUE:
+      return YARN_QUEUE == type;
+    default:
+      return false;
+    }
+  }
+
+  /**
+   * Whether the input type can be a child of this entity.
+   *
+   * @param type entity type.
+   * @return true, if this entity type is child of passed entity type, false
+   *     otherwise.
+   */
+  public boolean isChild(TimelineEntityType type) {
+    switch (this) {
+    case YARN_CLUSTER:
+      return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
+    case YARN_FLOW_RUN:
+      return YARN_FLOW_RUN == type || YARN_APPLICATION == type;
+    case YARN_APPLICATION:
+      return YARN_APPLICATION_ATTEMPT == type;
+    case YARN_APPLICATION_ATTEMPT:
+      return YARN_CONTAINER == type;
+    case YARN_CONTAINER:
+      return false;
+    case YARN_QUEUE:
+      return YARN_QUEUE == type;
+    default:
+      return false;
+    }
+  }
+
+  /**
+   * Whether the type of this entity matches the type indicated by the input
+   * argument.
+   *
+   * @param typeString entity type represented as a string.
+   * @return true, if string representation of this entity type matches the
+   *     entity type passed.
+   */
+  public boolean matches(String typeString) {
+    return toString().equals(typeString);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java
new file mode 100644
index 0000000..87fc291
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEvent.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.util.TimelineServiceHelper;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * This class contains the information of an event that belongs to an entity.
+ * Users are free to define what the event means, such as starting an
+ * application, container being allocated, etc.
+ */
+@XmlRootElement(name = "event")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineEvent implements Comparable<TimelineEvent> {
+  public static final long INVALID_TIMESTAMP = 0L;
+
+  private String id;
+  private HashMap<String, Object> info = new HashMap<>();
+  private long timestamp;
+
+  public TimelineEvent() {
+
+  }
+
+  @XmlElement(name = "id")
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String eventId) {
+    this.id = eventId;
+  }
+
+  // required by JAXB
+  @InterfaceAudience.Private
+  @XmlElement(name = "info")
+  public HashMap<String, Object> getInfoJAXB() {
+    return info;
+  }
+
+  public Map<String, Object> getInfo() {
+    return info;
+  }
+
+  public void setInfo(Map<String, Object> infos) {
+    this.info = TimelineServiceHelper.mapCastToHashMap(infos);
+  }
+
+  public void addInfo(Map<String, Object> infos) {
+    this.info.putAll(infos);
+  }
+
+  public void addInfo(String key, Object value) {
+    info.put(key, value);
+  }
+
+  @XmlElement(name = "timestamp")
+  public long getTimestamp() {
+    return timestamp;
+  }
+
+  public void setTimestamp(long ts) {
+    this.timestamp = ts;
+  }
+
+  public boolean isValid() {
+    return (id != null && timestamp != INVALID_TIMESTAMP);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = (int) (timestamp ^ (timestamp >>> 32));
+    result = 31 * result + id.hashCode();
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TimelineEvent)) {
+      return false;
+    }
+
+    TimelineEvent event = (TimelineEvent) o;
+
+    if (timestamp != event.timestamp) {
+      return false;
+    }
+    if (!id.equals(event.id)) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int compareTo(TimelineEvent other) {
+    if (timestamp > other.timestamp) {
+      return -1;
+    } else if (timestamp < other.timestamp) {
+      return 1;
+    } else {
+      return id.compareTo(other.id);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
new file mode 100644
index 0000000..5c908d6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetric.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * This class contains the information of a metric that is related to some
+ * entity. Metric can either be a time series or single value.
+ */
+@XmlRootElement(name = "metric")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineMetric {
+
+  /**
+   * Type of metric.
+   */
+  public static enum Type {
+    SINGLE_VALUE,
+    TIME_SERIES
+  }
+
+  private Type type;
+  private String id;
+  // By default, not to do any aggregation operations. This field will NOT be
+  // persisted (like a "transient" member).
+  private TimelineMetricOperation realtimeAggregationOp
+      = TimelineMetricOperation.NOP;
+
+  private TreeMap<Long, Number> values
+      = new TreeMap<>(Collections.reverseOrder());
+
+  public TimelineMetric() {
+    this(Type.SINGLE_VALUE);
+  }
+
+  public TimelineMetric(Type type) {
+    this.type = type;
+  }
+
+
+  @XmlElement(name = "type")
+  public Type getType() {
+    return type;
+  }
+
+  public void setType(Type metricType) {
+    this.type = metricType;
+  }
+
+  @XmlElement(name = "id")
+  public String getId() {
+    return id;
+  }
+
+  public void setId(String metricId) {
+    this.id = metricId;
+  }
+
+  /**
+   * Get the real time aggregation operation of this metric.
+   *
+   * @return Real time aggregation operation
+   */
+  // required by JAXB
+  @XmlElement(name = "aggregationOp")
+  public TimelineMetricOperation getRealtimeAggregationOp() {
+    return realtimeAggregationOp;
+  }
+
+  /**
+   * Set the real time aggregation operation of this metric.
+   *
+   * @param op A timeline metric operation that the metric should perform on
+   *           real time aggregations
+   */
+  public void setRealtimeAggregationOp(
+      final TimelineMetricOperation op) {
+    this.realtimeAggregationOp = op;
+  }
+
+  // required by JAXB
+  @InterfaceAudience.Private
+  @XmlElement(name = "values")
+  public TreeMap<Long, Number> getValuesJAXB() {
+    return values;
+  }
+
+  public Map<Long, Number> getValues() {
+    return values;
+  }
+
+  public void setValues(Map<Long, Number> vals) {
+    if (type == Type.SINGLE_VALUE) {
+      overwrite(vals);
+    } else {
+      if (vals != null) {
+        this.values = new TreeMap<>(Collections.reverseOrder());
+        this.values.putAll(vals);
+      } else {
+        this.values = null;
+      }
+    }
+  }
+
+  public void addValues(Map<Long, Number> vals) {
+    if (type == Type.SINGLE_VALUE) {
+      overwrite(vals);
+    } else {
+      this.values.putAll(vals);
+    }
+  }
+
+  public void addValue(long timestamp, Number value) {
+    if (type == Type.SINGLE_VALUE) {
+      values.clear();
+    }
+    values.put(timestamp, value);
+  }
+
+  private void overwrite(Map<Long, Number> vals) {
+    if (vals.size() > 1) {
+      throw new IllegalArgumentException(
+          "Values cannot contain more than one point in " +
+              Type.SINGLE_VALUE + " mode");
+    }
+    this.values.clear();
+    this.values.putAll(vals);
+  }
+
+  public boolean isValid() {
+    return (id != null);
+  }
+
+  @Override
+  public int hashCode() {
+    int result = id.hashCode();
+    result = 31 * result + type.hashCode();
+    return result;
+  }
+
+  // Only check if type and id are equal
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof TimelineMetric)) {
+      return false;
+    }
+
+    TimelineMetric m = (TimelineMetric) o;
+
+    if (!id.equals(m.id)) {
+      return false;
+    }
+    if (type != m.type) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "{id: " + id + ", type: " + type +
+        ", realtimeAggregationOp: " +
+        realtimeAggregationOp + "; " + values.toString() +
+        "}";
+  }
+
+  /**
+   * Get the latest timeline metric as single value type.
+   *
+   * @param metric Incoming timeline metric
+   * @return The latest metric in the incoming metric
+   */
+  public static TimelineMetric getLatestSingleValueMetric(
+      TimelineMetric metric) {
+    if (metric.getType() == Type.SINGLE_VALUE) {
+      return metric;
+    } else {
+      TimelineMetric singleValueMetric = new TimelineMetric(Type.SINGLE_VALUE);
+      Long firstKey = metric.values.firstKey();
+      if (firstKey != null) {
+        Number firstValue = metric.values.get(firstKey);
+        singleValueMetric.addValue(firstKey, firstValue);
+      }
+      return singleValueMetric;
+    }
+  }
+
+  /**
+   * Get single data timestamp of the metric.
+   *
+   * @return the single data timestamp
+   */
+  public long getSingleDataTimestamp() {
+    if (this.type == Type.SINGLE_VALUE) {
+      if (values.size() == 0) {
+        throw new YarnRuntimeException("Values for this timeline metric is " +
+            "empty.");
+      } else {
+        return values.firstKey();
+      }
+    } else {
+      throw new YarnRuntimeException("Type for this timeline metric is not " +
+          "SINGLE_VALUE.");
+    }
+  }
+
+  /**
+   * Get single data value of the metric.
+   *
+   * @return the single data value
+   */
+  public Number getSingleDataValue() {
+    if (this.type == Type.SINGLE_VALUE) {
+      if (values.size() == 0) {
+        return null;
+      } else {
+        return values.get(values.firstKey());
+      }
+    } else {
+      throw new YarnRuntimeException("Type for this timeline metric is not " +
+          "SINGLE_VALUE.");
+    }
+  }
+
+  /**
+   * Aggregate an incoming metric to the base aggregated metric with the given
+   * operation state in a stateless fashion. The assumption here is
+   * baseAggregatedMetric and latestMetric should be single value data if not
+   * null.
+   *
+   * @param incomingMetric Incoming timeline metric to aggregate
+   * @param baseAggregatedMetric Base timeline metric
+   * @return Result metric after aggregation
+   */
+  public static TimelineMetric aggregateTo(TimelineMetric incomingMetric,
+      TimelineMetric baseAggregatedMetric) {
+    return aggregateTo(incomingMetric, baseAggregatedMetric, null);
+  }
+
+  /**
+   * Aggregate an incoming metric to the base aggregated metric with the given
+   * operation state. The assumption here is baseAggregatedMetric and
+   * latestMetric should be single value data if not null.
+   *
+   * @param incomingMetric Incoming timeline metric to aggregate
+   * @param baseAggregatedMetric Base timeline metric
+   * @param state Operation state
+   * @return Result metric after aggregation
+   */
+  public static TimelineMetric aggregateTo(TimelineMetric incomingMetric,
+      TimelineMetric baseAggregatedMetric, Map<Object, Object> state) {
+    TimelineMetricOperation operation
+        = incomingMetric.getRealtimeAggregationOp();
+    return operation.aggregate(incomingMetric, baseAggregatedMetric, state);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
new file mode 100644
index 0000000..4c9045f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricCalculator.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * A calculator for timeline metrics.
+ */
+public final class TimelineMetricCalculator {
+
+  private TimelineMetricCalculator() {
+    // do nothing.
+  }
+
+  /**
+   * Compare two not-null numbers.
+   * @param n1 Number n1
+   * @param n2 Number n2
+   * @return 0 if n1 equals n2, a negative int if n1 is less than n2, a
+   * positive int otherwise.
+   */
+  public static int compare(Number n1, Number n2) {
+    if (n1 == null || n2 == null) {
+      throw new YarnRuntimeException(
+          "Number to be compared shouldn't be null.");
+    }
+
+    if (n1 instanceof Integer || n1 instanceof Long) {
+      if (n1.longValue() == n2.longValue()) {
+        return 0;
+      } else {
+        return (n1.longValue() < n2.longValue()) ? -1 : 1;
+      }
+    }
+
+    if (n1 instanceof Float || n1 instanceof Double) {
+      if (n1.doubleValue() == n2.doubleValue()) {
+        return 0;
+      } else {
+        return (n1.doubleValue() < n2.doubleValue()) ? -1 : 1;
+      }
+    }
+
+    // TODO throw warnings/exceptions for other types of number.
+    throw new YarnRuntimeException("Unsupported types for number comparison: "
+        + n1.getClass().getName() + ", " + n2.getClass().getName());
+  }
+
+  /**
+   * Subtract operation between two Numbers.
+   * @param n1 Number n1
+   * @param n2 Number n2
+   * @return Number represent to (n1 - n2).
+   */
+  public static Number sub(Number n1, Number n2) {
+    if (n1 == null) {
+      throw new YarnRuntimeException(
+          "Number to be subtracted shouldn't be null.");
+    } else if (n2 == null) {
+      return n1;
+    }
+
+    if (n1 instanceof Integer || n1 instanceof Long) {
+      return n1.longValue() - n2.longValue();
+    }
+
+    if (n1 instanceof Float || n1 instanceof Double) {
+      return n1.doubleValue() - n2.doubleValue();
+    }
+
+    // TODO throw warnings/exceptions for other types of number.
+    return null;
+  }
+
+  /**
+   * Sum up two Numbers.
+   * @param n1 Number n1
+   * @param n2 Number n2
+   * @return Number represent to (n1 + n2).
+   */
+  public static Number sum(Number n1, Number n2) {
+    if (n1 == null) {
+      return n2;
+    } else if (n2 == null) {
+      return n1;
+    }
+
+    if (n1 instanceof Integer || n1 instanceof Long) {
+      return n1.longValue() + n2.longValue();
+    }
+
+    if (n1 instanceof Float || n1 instanceof Double) {
+      return n1.doubleValue() + n2.doubleValue();
+    }
+
+    // TODO throw warnings/exceptions for other types of number.
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
new file mode 100644
index 0000000..58e5c38
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineMetricOperation.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import java.util.Map;
+
+/**
+ * Aggregation operations.
+ */
+public enum TimelineMetricOperation {
+  NOP("NOP") {
+    /**
+     * Do nothing on the base metric.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state (not used)
+     * @return Metric b
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming,
+        TimelineMetric base, Map<Object, Object> state) {
+      return base;
+    }
+  },
+  MAX("MAX") {
+    /**
+     * Keep the greater value of incoming and base. Stateless operation.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state (not used)
+     * @return the greater value of a and b
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming,
+        TimelineMetric base, Map<Object, Object> state) {
+      if (base == null) {
+        return incoming;
+      }
+      Number incomingValue = incoming.getSingleDataValue();
+      Number aggregateValue = base.getSingleDataValue();
+      if (aggregateValue == null) {
+        aggregateValue = Long.MIN_VALUE;
+      }
+      if (TimelineMetricCalculator.compare(incomingValue, aggregateValue) > 0) {
+        base.addValue(incoming.getSingleDataTimestamp(), incomingValue);
+      }
+      return base;
+    }
+  },
+  REPLACE("REPLACE") {
+    /**
+     * Replace the base metric with the incoming value. Stateless operation.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state (not used)
+     * @return Metric a
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming,
+        TimelineMetric base,
+        Map<Object, Object> state) {
+      return incoming;
+    }
+  },
+  SUM("SUM") {
+    /**
+     * Return the sum of the incoming metric and the base metric if the
+     * operation is stateless. For stateful operations, also subtract the
+     * value of the timeline metric mapped to the PREV_METRIC_STATE_KEY
+     * in the state object.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state (PREV_METRIC_STATE_KEY's value as Metric p)
+     * @return A metric with value a + b - p
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+        Map<Object, Object> state) {
+      if (base == null) {
+        return incoming;
+      }
+      Number incomingValue = incoming.getSingleDataValue();
+      Number aggregateValue = base.getSingleDataValue();
+      Number result
+          = TimelineMetricCalculator.sum(incomingValue, aggregateValue);
+
+      // If there are previous value in the state, we will take it off from the
+      // sum
+      if (state != null) {
+        Object prevMetric = state.get(PREV_METRIC_STATE_KEY);
+        if (prevMetric instanceof TimelineMetric) {
+          result = TimelineMetricCalculator.sub(result,
+              ((TimelineMetric) prevMetric).getSingleDataValue());
+        }
+      }
+      base.addValue(incoming.getSingleDataTimestamp(), result);
+      return base;
+    }
+  },
+  AVG("AVERAGE") {
+    /**
+     * Return the average value of the incoming metric and the base metric,
+     * with a given state. Not supported yet.
+     *
+     * @param incoming Metric a
+     * @param base Metric b
+     * @param state Operation state
+     * @return Not finished yet
+     */
+    @Override
+    public TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+        Map<Object, Object> state) {
+      // Not supported yet
+      throw new UnsupportedOperationException(
+          "Unsupported aggregation operation: AVERAGE");
+    }
+  };
+
+  public static final String PREV_METRIC_STATE_KEY = "PREV_METRIC";
+
+  /**
+   * Perform the aggregation operation.
+   *
+   * @param incoming Incoming metric
+   * @param aggregate Base aggregation metric
+   * @param state Operation state
+   * @return Result metric for this aggregation operation
+   */
+  public TimelineMetric aggregate(TimelineMetric incoming,
+      TimelineMetric aggregate, Map<Object, Object> state) {
+    return exec(incoming, aggregate, state);
+  }
+
+  private final String opName;
+
+  TimelineMetricOperation(String opString) {
+    opName = opString;
+  }
+
+  @Override
+  public String toString() {
+    return this.opName;
+  }
+
+  abstract TimelineMetric exec(TimelineMetric incoming, TimelineMetric base,
+      Map<Object, Object> state);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
new file mode 100644
index 0000000..eda1ee2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineWriteResponse.java
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A class that holds a list of put errors. This is the response returned when a
+ * list of {@link TimelineEntity} objects is added to the timeline. If there are
+ * errors in storing individual entity objects, they will be indicated in the
+ * list of errors.
+ */
+@XmlRootElement(name = "response")
+@XmlAccessorType(XmlAccessType.NONE)
+@Public
+@Unstable
+public class TimelineWriteResponse {
+
+  private List<TimelineWriteError> errors = new ArrayList<TimelineWriteError>();
+
+  public TimelineWriteResponse() {
+
+  }
+
+  /**
+   * Get a list of {@link TimelineWriteError} instances.
+   *
+   * @return a list of {@link TimelineWriteError} instances
+   */
+  @XmlElement(name = "errors")
+  public List<TimelineWriteError> getErrors() {
+    return errors;
+  }
+
+  /**
+   * Add a single {@link TimelineWriteError} instance into the existing list.
+   *
+   * @param error
+   *          a single {@link TimelineWriteError} instance
+   */
+  public void addError(TimelineWriteError error) {
+    errors.add(error);
+  }
+
+  /**
+   * Add a list of {@link TimelineWriteError} instances into the existing list.
+   *
+   * @param writeErrors
+   *          a list of {@link TimelineWriteError} instances
+   */
+  public void addErrors(List<TimelineWriteError> writeErrors) {
+    this.errors.addAll(writeErrors);
+  }
+
+  /**
+   * Set the list to the given list of {@link TimelineWriteError} instances.
+   *
+   * @param writeErrors
+   *          a list of {@link TimelineWriteError} instances
+   */
+  public void setErrors(List<TimelineWriteError> writeErrors) {
+    this.errors.clear();
+    this.errors.addAll(writeErrors);
+  }
+
+  /**
+   * A class that holds the error code for one entity.
+   */
+  @XmlRootElement(name = "error")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class TimelineWriteError {
+
+    /**
+     * Error code returned if an IOException is encountered when storing an
+     * entity.
+     */
+    public static final int IO_EXCEPTION = 1;
+
+    private String entityId;
+    private String entityType;
+    private int errorCode;
+
+    /**
+     * Get the entity Id.
+     *
+     * @return the entity Id
+     */
+    @XmlElement(name = "entity")
+    public String getEntityId() {
+      return entityId;
+    }
+
+    /**
+     * Set the entity Id.
+     *
+     * @param id the entity Id.
+     */
+    public void setEntityId(String id) {
+      this.entityId = id;
+    }
+
+    /**
+     * Get the entity type.
+     *
+     * @return the entity type
+     */
+    @XmlElement(name = "entitytype")
+    public String getEntityType() {
+      return entityType;
+    }
+
+    /**
+     * Set the entity type.
+     *
+     * @param type the entity type.
+     */
+    public void setEntityType(String type) {
+      this.entityType = type;
+    }
+
+    /**
+     * Get the error code.
+     *
+     * @return an error code
+     */
+    @XmlElement(name = "errorcode")
+    public int getErrorCode() {
+      return errorCode;
+    }
+
+    /**
+     * Set the error code to the given error code.
+     *
+     * @param code an error code.
+     */
+    public void setErrorCode(int code) {
+      this.errorCode = code;
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/UserEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/UserEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/UserEntity.java
new file mode 100644
index 0000000..ced57c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/UserEntity.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * This entity represents a user.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class UserEntity extends TimelineEntity {
+  public UserEntity() {
+    super(TimelineEntityType.YARN_USER.toString());
+  }
+
+  public UserEntity(TimelineEntity entity) {
+    super(entity);
+    if (!entity.getType().equals(TimelineEntityType.YARN_USER.toString())) {
+      throw new IllegalArgumentException("Incompatible entity type: "
+          + getId());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/package-info.java
new file mode 100644
index 0000000..43805c8
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package org.apache.hadoop.yarn.api.records.timelineservice contains classes
+ * which define the data model for ATSv2.
+ */
+@InterfaceAudience.Public
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index 5716e3a..90fbbbf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -83,6 +83,10 @@ public class YarnConfiguration extends Configuration {
         new DeprecationDelta("yarn.client.max-nodemanagers-proxies",
             NM_CLIENT_MAX_NM_PROXIES)
     });
+    Configuration.addDeprecations(new DeprecationDelta[] {
+        new DeprecationDelta(RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
+            SYSTEM_METRICS_PUBLISHER_ENABLED)
+    });
   }
 
   //Configurations
@@ -133,6 +137,7 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_PREFIX = "yarn.resourcemanager.";
 
   public static final String RM_CLUSTER_ID = RM_PREFIX + "cluster-id";
+  public static final String DEFAULT_RM_CLUSTER_ID = "yarn_cluster";
 
   public static final String RM_HOSTNAME = RM_PREFIX + "hostname";
 
@@ -497,16 +502,37 @@ public class YarnConfiguration extends Configuration {
 
   /**
    *  The setting that controls whether yarn system metrics is published on the
-   *  timeline server or not by RM.
+   *  timeline server or not by RM. This configuration setting is for ATS V1.
+   *  This is now deprecated in favor of SYSTEM_METRICS_PUBLISHER_ENABLED.
+   */
+  public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
+      + "system-metrics-publisher.enabled";
+  public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED =
+      false;
+
+  /**
+   *  The setting that controls whether yarn system metrics is published on the
+   *  timeline server or not by RM and NM. This configuration setting is for
+   *  ATS v2.
    */
-  public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED =
-      RM_PREFIX + "system-metrics-publisher.enabled";
-  public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
+  public static final String SYSTEM_METRICS_PUBLISHER_ENABLED = YARN_PREFIX
+      + "system-metrics-publisher.enabled";
+  public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
+
+  /**
+   * The setting that controls whether yarn container events are published to
+   * the timeline service or not by RM. This configuration setting is for ATS
+   * V2
+   */
+  public static final String RM_PUBLISH_CONTAINER_EVENTS_ENABLED = YARN_PREFIX
+      + "rm.system-metrics-publisher.emit-container-events";
+  public static final boolean DEFAULT_RM_PUBLISH_CONTAINER_EVENTS_ENABLED =
+      false;
 
   public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
       RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
-  public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
-      10;
+  public static final int
+      DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE = 10;
 
   //RM delegation token related keys
   public static final String RM_DELEGATION_KEY_UPDATE_INTERVAL_KEY =
@@ -839,6 +865,11 @@ public class YarnConfiguration extends Configuration {
     NM_PREFIX + "container-manager.thread-count";
   public static final int DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT = 20;
   
+  /** Number of threads container manager uses.*/
+  public static final String NM_COLLECTOR_SERVICE_THREAD_COUNT =
+      NM_PREFIX + "collector-service.thread-count";
+  public static final int DEFAULT_NM_COLLECTOR_SERVICE_THREAD_COUNT = 5;
+
   /** Number of threads used in cleanup.*/
   public static final String NM_DELETE_THREAD_COUNT = 
     NM_PREFIX +  "delete.thread-count";
@@ -866,6 +897,13 @@ public class YarnConfiguration extends Configuration {
   public static final String DEFAULT_NM_LOCALIZER_ADDRESS = "0.0.0.0:" +
     DEFAULT_NM_LOCALIZER_PORT;
   
+  /** Address where the collector service IPC is.*/
+  public static final String NM_COLLECTOR_SERVICE_ADDRESS =
+      NM_PREFIX + "collector-service.address";
+  public static final int DEFAULT_NM_COLLECTOR_SERVICE_PORT = 8048;
+  public static final String DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS =
+      "0.0.0.0:" + DEFAULT_NM_LOCALIZER_PORT;
+
   /** Interval in between cache cleanups.*/
   public static final String NM_LOCALIZER_CACHE_CLEANUP_INTERVAL_MS =
     NM_PREFIX + "localizer.cache.cleanup.interval-ms";
@@ -1835,7 +1873,7 @@ public class YarnConfiguration extends Configuration {
   public static final String TIMELINE_SERVICE_UI_WEB_PATH_PREFIX =
       TIMELINE_SERVICE_PREFIX + "ui-web-path.";
 
-  /** Timeline client settings */
+  /** Timeline client settings. */
   public static final String TIMELINE_SERVICE_CLIENT_PREFIX =
       TIMELINE_SERVICE_PREFIX + "client.";
 
@@ -1971,6 +2009,53 @@ public class YarnConfiguration extends Configuration {
       = TIMELINE_SERVICE_PREFIX
       + "entity-file.fs-support-append";
 
+  /**
+   * Settings for timeline service v2.0
+   */
+  public static final String TIMELINE_SERVICE_WRITER_CLASS =
+      TIMELINE_SERVICE_PREFIX + "writer.class";
+
+  public static final String TIMELINE_SERVICE_READER_CLASS =
+      TIMELINE_SERVICE_PREFIX + "reader.class";
+
+  /** The setting that controls how often the timeline collector flushes the
+   * timeline writer.
+   */
+  public static final String TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS =
+      TIMELINE_SERVICE_PREFIX + "writer.flush-interval-seconds";
+
+  public static final int
+      DEFAULT_TIMELINE_SERVICE_WRITER_FLUSH_INTERVAL_SECONDS = 60;
+
+  /**
+   * The name for setting that controls how long the final value of
+   * a metric of a completed app is retained before merging
+   * into the flow sum.
+   */
+  public static final String APP_FINAL_VALUE_RETENTION_THRESHOLD =
+      TIMELINE_SERVICE_PREFIX
+      + "hbase.coprocessor.app-final-value-retention-milliseconds";
+
+  /**
+   * The setting that controls how long the final value of a metric of a
+   * completed app is retained before merging into the flow sum. Up to this time
+   * after an application is completed out-of-order values that arrive can be
+   * recognized and discarded at the cost of increased storage.
+   */
+  public static final long DEFAULT_APP_FINAL_VALUE_RETENTION_THRESHOLD = 3 * 24
+      * 60 * 60 * 1000L;
+
+  public static final String ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS =
+      TIMELINE_SERVICE_PREFIX + "app-collector.linger-period.ms";
+
+  public static final int DEFAULT_ATS_APP_COLLECTOR_LINGER_PERIOD_IN_MS = 1000;
+
+  public static final String NUMBER_OF_ASYNC_ENTITIES_TO_MERGE =
+      TIMELINE_SERVICE_PREFIX
+          + "timeline-client.number-of-async-entities-to-merge";
+
+  public static final int DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE = 10;
+
   // mark app-history related configs @Private as application history is going
   // to be integrated into the timeline service
   @Private
@@ -2029,6 +2114,7 @@ public class YarnConfiguration extends Configuration {
   /** The listening endpoint for the timeline service application.*/
   public static final String TIMELINE_SERVICE_BIND_HOST =
       TIMELINE_SERVICE_PREFIX + "bind-host";
+  public static final String DEFAULT_TIMELINE_SERVICE_BIND_HOST = "0.0.0.0";
 
   /** The number of threads to handle client RPC API requests. */
   public static final String TIMELINE_SERVICE_HANDLER_THREAD_COUNT =
@@ -2224,6 +2310,16 @@ public class YarnConfiguration extends Configuration {
   public static final long    DEFAULT_TIMELINE_DELEGATION_TOKEN_MAX_LIFETIME =
       7*24*60*60*1000; // 7 days
 
+  // Timeline service v2 offlien aggregation related keys
+  public static final String TIMELINE_OFFLINE_AGGREGATION_PREFIX =
+      YarnConfiguration.TIMELINE_SERVICE_PREFIX + "aggregation.offline.";
+  public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR
+      = TIMELINE_OFFLINE_AGGREGATION_PREFIX
+          + "phoenix.connectionString";
+
+  public static final String PHOENIX_OFFLINE_STORAGE_CONN_STR_DEFAULT
+      = "jdbc:phoenix:localhost:2181:/hbase";
+
   // ///////////////////////////////
   // Shared Cache Configs
   // ///////////////////////////////
@@ -2778,6 +2874,53 @@ public class YarnConfiguration extends Configuration {
         YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED_DEFAULT);
   }
 
+  // helper methods for timeline service configuration
+  /**
+   * Returns whether the timeline service is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service is enabled.
+   */
+  public static boolean timelineServiceEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+      YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED);
+  }
+
+  /**
+   * Returns the timeline service version. It does not check whether the
+   * timeline service itself is enabled.
+   *
+   * @param conf the configuration
+   * @return the timeline service version as a float.
+   */
+  public static float getTimelineServiceVersion(Configuration conf) {
+    return conf.getFloat(TIMELINE_SERVICE_VERSION,
+        DEFAULT_TIMELINE_SERVICE_VERSION);
+  }
+
+  /**
+   * Returns whether the timeline service v.2 is enabled via configuration.
+   *
+   * @param conf the configuration
+   * @return whether the timeline service v.2 is enabled. V.2 refers to a
+   * version greater than equal to 2 but smaller than 3.
+   */
+  public static boolean timelineServiceV2Enabled(Configuration conf) {
+    return timelineServiceEnabled(conf) &&
+        (int)getTimelineServiceVersion(conf) == 2;
+  }
+
+  /**
+   * Returns whether the system publisher is enabled.
+   *
+   * @param conf the configuration
+   * @return whether the system publisher is enabled.
+   */
+  public static boolean systemMetricsPublisherEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+        YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
+  }
+
   /* For debugging. mp configurations to system output as XML format. */
   public static void main(String[] args) throws Exception {
     new YarnConfiguration(new Configuration()).writeXml(System.out);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java
new file mode 100644
index 0000000..e0268a6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/TimelineServiceHelper.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+
+/**
+ * Helper class for Timeline service.
+ */
+@LimitedPrivate({ "MapReduce", "YARN" })
+public final class TimelineServiceHelper {
+
+  private TimelineServiceHelper() {
+    // Utility classes should not have a public or default constructor.
+  }
+
+  /**
+   * Cast map to HashMap for generic type.
+   * @param originalMap the map need to be casted
+   * @param <E> key type
+   * @param <V> value type
+   * @return casted HashMap object
+   */
+  public static <E, V> HashMap<E, V> mapCastToHashMap(
+      Map<E, V> originalMap) {
+    return originalMap == null ? null : originalMap instanceof HashMap ?
+        (HashMap<E, V>) originalMap : new HashMap<E, V>(originalMap);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
index e5eab0a..6526bf9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
@@ -111,6 +111,7 @@ message AllocateResponseProto {
   repeated UpdatedContainerProto updated_containers = 10;
   optional hadoop.common.TokenProto am_rm_token = 12;
   optional PriorityProto application_priority = 13;
+  optional string collector_addr = 14;
   repeated UpdateContainerErrorProto update_errors = 15;
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
new file mode 100644
index 0000000..3244bc3
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineMetric.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric.Type;
+
+import org.junit.Test;
+
+public class TestTimelineMetric {
+
+  @Test
+  public void testTimelineMetricAggregation() {
+    long ts = System.currentTimeMillis();
+    // single_value metric add against null metric
+    TimelineMetric m1 = getSingleValueMetric("MEGA_BYTES_MILLIS",
+        TimelineMetricOperation.SUM, ts, 10000L);
+    TimelineMetric aggregatedMetric = TimelineMetric.aggregateTo(m1, null);
+    assertEquals(10000L, aggregatedMetric.getSingleDataValue());
+
+    TimelineMetric m2 = getSingleValueMetric("MEGA_BYTES_MILLIS",
+        TimelineMetricOperation.SUM, ts, 20000L);
+    aggregatedMetric = TimelineMetric.aggregateTo(m2, aggregatedMetric);
+    assertEquals(30000L, aggregatedMetric.getSingleDataValue());
+
+    // stateful sum test
+    Map<Object, Object> state = new HashMap<>();
+    state.put(TimelineMetricOperation.PREV_METRIC_STATE_KEY, m2);
+    TimelineMetric m2New = getSingleValueMetric("MEGA_BYTES_MILLIS",
+        TimelineMetricOperation.SUM, ts, 10000L);
+    aggregatedMetric = TimelineMetric.aggregateTo(m2New, aggregatedMetric,
+        state);
+    assertEquals(20000L, aggregatedMetric.getSingleDataValue());
+
+    // single_value metric max against single_value metric
+    TimelineMetric m3 = getSingleValueMetric("TRANSFER_RATE",
+        TimelineMetricOperation.MAX, ts, 150L);
+    TimelineMetric aggregatedMax = TimelineMetric.aggregateTo(m3, null);
+    assertEquals(150L, aggregatedMax.getSingleDataValue());
+
+    TimelineMetric m4 = getSingleValueMetric("TRANSFER_RATE",
+        TimelineMetricOperation.MAX, ts, 170L);
+    aggregatedMax = TimelineMetric.aggregateTo(m4, aggregatedMax);
+    assertEquals(170L, aggregatedMax.getSingleDataValue());
+
+    // single_value metric avg against single_value metric
+    TimelineMetric m5 = getSingleValueMetric("TRANSFER_RATE",
+        TimelineMetricOperation.AVG, ts, 150L);
+    try {
+      TimelineMetric.aggregateTo(m5, null);
+      fail("Taking average among metrics is not supported! ");
+    } catch (UnsupportedOperationException e) {
+      // Expected
+    }
+
+  }
+
+  private static TimelineMetric getSingleValueMetric(String id,
+      TimelineMetricOperation op, long timestamp, long value) {
+    TimelineMetric m = new TimelineMetric();
+    m.setId(id);
+    m.setType(Type.SINGLE_VALUE);
+    m.setRealtimeAggregationOp(op);
+    Map<Long, Number> metricValues = new HashMap<Long, Number>();
+    metricValues.put(timestamp, value);
+    m.setValues(metricValues);
+    return m;
+  }
+
+  private static TimelineMetric getTimeSeriesMetric(String id,
+      TimelineMetricOperation op, Map<Long, Number> metricValues) {
+    TimelineMetric m = new TimelineMetric();
+    m.setId(id);
+    m.setType(Type.TIME_SERIES);
+    m.setRealtimeAggregationOp(op);
+    m.setValues(metricValues);
+    return m;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
index 818f4e0..0c40fa9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
@@ -101,6 +101,9 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
 
     // Ignore all YARN Application Timeline Service (version 1) properties
     configurationPrefixToSkipCompare.add("yarn.timeline-service.");
+    // skip deprecated RM_SYSTEM_METRICS_PUBLISHER_ENABLED
+    configurationPropsToSkipCompare
+        .add(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
 
     // Used as Java command line properties, not XML
     configurationPrefixToSkipCompare.add("yarn.app.container");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
index 2c1e36d..9dde71d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/pom.xml
@@ -70,6 +70,16 @@
       <groupId>commons-io</groupId>
       <artifactId>commons-io</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
index aca3a1b..17dae6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
@@ -103,7 +103,6 @@ import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.log4j.LogManager;
 
@@ -220,6 +219,8 @@ public class ApplicationMaster {
   // Tracking url to which app master publishes info for clients to monitor
   private String appMasterTrackingUrl = "";
 
+  private boolean timelineServiceV2 = false;
+
   // App Master configuration
   // No. of containers to run shell command on
   @VisibleForTesting
@@ -553,6 +554,14 @@ public class ApplicationMaster {
         cliParser.getOptionValue("container_max_retries", "0"));
     containrRetryInterval = Integer.parseInt(cliParser.getOptionValue(
         "container_retry_interval", "0"));
+
+    if (YarnConfiguration.timelineServiceEnabled(conf)) {
+      timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
+    } else {
+      timelineClient = null;
+      LOG.warn("Timeline service is not enabled");
+    }
+
     return true;
   }
 
@@ -600,7 +609,6 @@ public class ApplicationMaster {
         UserGroupInformation.createRemoteUser(appSubmitterUserName);
     appSubmitterUgi.addCredentials(credentials);
 
-
     AMRMClientAsync.AbstractCallbackHandler allocListener =
         new RMCallbackHandler();
     amRMClient = AMRMClientAsync.createAMRMClientAsync(1000, allocListener);
@@ -613,9 +621,18 @@ public class ApplicationMaster {
     nmClientAsync.start();
 
     startTimelineClient(conf);
+    if (timelineServiceV2) {
+      // need to bind timelineClient
+      amRMClient.registerTimelineClient(timelineClient);
+    }
     if(timelineClient != null) {
-      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-          DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
+      if (timelineServiceV2) {
+        publishApplicationAttemptEventOnTimelineServiceV2(
+            DSEvent.DS_APP_ATTEMPT_START);
+      } else {
+        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+            DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
+      }
     }
 
     // Setup local RPC Server to accept status requests directly from clients
@@ -685,10 +702,16 @@ public class ApplicationMaster {
       appSubmitterUgi.doAs(new PrivilegedExceptionAction<Void>() {
         @Override
         public Void run() throws Exception {
-          if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
+          if (YarnConfiguration.timelineServiceEnabled(conf)) {
             // Creating the Timeline Client
-            timelineClient = TimelineClient.createTimelineClient();
+            if (timelineServiceV2) {
+              timelineClient = TimelineClient.createTimelineClient(
+                  appAttemptID.getApplicationId());
+              LOG.info("Timeline service V2 client is enabled");
+            } else {
+              timelineClient = TimelineClient.createTimelineClient();
+              LOG.info("Timeline service V1 client is enabled");
+            }
             timelineClient.init(conf);
             timelineClient.start();
           } else {
@@ -718,9 +741,14 @@ public class ApplicationMaster {
       } catch (InterruptedException ex) {}
     }
 
-    if(timelineClient != null) {
-      publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
-          DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
+    if (timelineClient != null) {
+      if (timelineServiceV2) {
+        publishApplicationAttemptEventOnTimelineServiceV2(
+            DSEvent.DS_APP_ATTEMPT_END);
+      } else {
+        publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
+            DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
+      }
     }
 
     // Join all launched threads
@@ -826,8 +854,12 @@ public class ApplicationMaster {
               + containerStatus.getContainerId());
         }
         if(timelineClient != null) {
-          publishContainerEndEvent(
-              timelineClient, containerStatus, domainId, appSubmitterUgi);
+          if (timelineServiceV2) {
+            publishContainerEndEventOnTimelineServiceV2(containerStatus);
+          } else {
+            publishContainerEndEvent(
+                timelineClient, containerStatus, domainId, appSubmitterUgi);
+          }
         }
       }
       
@@ -948,12 +980,18 @@ public class ApplicationMaster {
       }
       Container container = containers.get(containerId);
       if (container != null) {
-        applicationMaster.nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
+        applicationMaster.nmClientAsync.getContainerStatusAsync(
+            containerId, container.getNodeId());
       }
       if(applicationMaster.timelineClient != null) {
-        applicationMaster.publishContainerStartEvent(
-            applicationMaster.timelineClient, container,
-            applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+        if (applicationMaster.timelineServiceV2) {
+          applicationMaster.publishContainerStartEventOnTimelineServiceV2(
+              container);
+        } else {
+          applicationMaster.publishContainerStartEvent(
+              applicationMaster.timelineClient, container,
+              applicationMaster.domainId, applicationMaster.appSubmitterUgi);
+        }
       }
     }
 
@@ -1261,7 +1299,7 @@ public class ApplicationMaster {
       LOG.error("App Attempt "
           + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
           + " event could not be published for "
-          + appAttemptId.toString(), e);
+          + appAttemptID, e);
     }
   }
 
@@ -1308,4 +1346,109 @@ public class ApplicationMaster {
             shellId);
     return new Thread(runnableLaunchContainer);
   }
+
+  private void publishContainerStartEventOnTimelineServiceV2(
+      Container container) {
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+        entity =
+            new org.apache.hadoop.yarn.api.records.timelineservice.
+            TimelineEntity();
+    entity.setId(container.getId().toString());
+    entity.setType(DSEntity.DS_CONTAINER.toString());
+    long ts = System.currentTimeMillis();
+    entity.setCreatedTime(ts);
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
+
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
+    event.setTimestamp(ts);
+    event.setId(DSEvent.DS_CONTAINER_START.toString());
+    event.addInfo("Node", container.getNodeId().toString());
+    event.addInfo("Resources", container.getResource().toString());
+    entity.addEvent(event);
+
+    try {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public TimelinePutResponse run() throws Exception {
+          timelineClient.putEntities(entity);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Container start event could not be published for "
+          + container.getId().toString(),
+          e instanceof UndeclaredThrowableException ? e.getCause() : e);
+    }
+  }
+
+  private void publishContainerEndEventOnTimelineServiceV2(
+      final ContainerStatus container) {
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+        entity =
+            new org.apache.hadoop.yarn.api.records.timelineservice.
+            TimelineEntity();
+    entity.setId(container.getContainerId().toString());
+    entity.setType(DSEntity.DS_CONTAINER.toString());
+    //entity.setDomainId(domainId);
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
+        new  org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
+    event.setTimestamp(System.currentTimeMillis());
+    event.setId(DSEvent.DS_CONTAINER_END.toString());
+    event.addInfo("State", container.getState().name());
+    event.addInfo("Exit Status", container.getExitStatus());
+    entity.addEvent(event);
+
+    try {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public TimelinePutResponse run() throws Exception {
+          timelineClient.putEntities(entity);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("Container end event could not be published for "
+          + container.getContainerId().toString(),
+          e instanceof UndeclaredThrowableException ? e.getCause() : e);
+    }
+  }
+
+  private void publishApplicationAttemptEventOnTimelineServiceV2(
+      DSEvent appEvent) {
+    final org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
+        entity =
+            new org.apache.hadoop.yarn.api.records.timelineservice.
+            TimelineEntity();
+    entity.setId(appAttemptID.toString());
+    entity.setType(DSEntity.DS_APP_ATTEMPT.toString());
+    long ts = System.currentTimeMillis();
+    if (appEvent == DSEvent.DS_APP_ATTEMPT_START) {
+      entity.setCreatedTime(ts);
+    }
+    entity.addInfo("user", appSubmitterUgi.getShortUserName());
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent event =
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent();
+    event.setId(appEvent.toString());
+    event.setTimestamp(ts);
+    entity.addEvent(event);
+
+    try {
+      appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
+        @Override
+        public TimelinePutResponse run() throws Exception {
+          timelineClient.putEntitiesAsync(entity);
+          return null;
+        }
+      });
+    } catch (Exception e) {
+      LOG.error("App Attempt "
+          + (appEvent.equals(DSEvent.DS_APP_ATTEMPT_START) ? "start" : "end")
+          + " event could not be published for "
+          + appAttemptID,
+          e instanceof UndeclaredThrowableException ? e.getCause() : e);
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message