hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vino...@apache.org
Subject svn commit: r1570922 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline...
Date Sat, 22 Feb 2014 20:55:07 GMT
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,873 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.WriteBatch;
+
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong;
+
+/**
+ * An implementation of a timeline store backed by leveldb.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LeveldbTimelineStore extends AbstractService
+    implements TimelineStore {
+  private static final Log LOG = LogFactory
+      .getLog(LeveldbTimelineStore.class);
+
+  private static final String FILENAME = "leveldb-timeline-store.ldb";
+
+  private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
+  private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
+  private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes();
+
+  private static final byte[] PRIMARY_FILTER_COLUMN = "f".getBytes();
+  private static final byte[] OTHER_INFO_COLUMN = "i".getBytes();
+  private static final byte[] RELATED_COLUMN = "r".getBytes();
+  private static final byte[] TIME_COLUMN = "t".getBytes();
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  private static final int START_TIME_CACHE_SIZE = 10000;
+
+  @SuppressWarnings("unchecked")
+  private final Map<EntityIdentifier, Long> startTimeCache =
+      Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE));
+
+  private DB db;
+
+  public LeveldbTimelineStore() {
+    super(LeveldbTimelineStore.class.getName());
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    Options options = new Options();
+    options.createIfMissing(true);
+    JniDBFactory factory = new JniDBFactory();
+    String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH);
+    File p = new File(path);
+    if (!p.exists())
+      if (!p.mkdirs())
+        throw new IOException("Couldn't create directory for leveldb " +
+            "timeline store " + path);
+    LOG.info("Using leveldb path " + path);
+    db = factory.open(new File(path, FILENAME), options);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    IOUtils.cleanup(LOG, db);
+    super.serviceStop();
+  }
+
+  private static class KeyBuilder {
+    private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
+    private byte[][] b;
+    private boolean[] useSeparator;
+    private int index;
+    private int length;
+
+    public KeyBuilder(int size) {
+      b = new byte[size][];
+      useSeparator = new boolean[size];
+      index = 0;
+      length = 0;
+    }
+
+    public static KeyBuilder newInstance() {
+      return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
+    }
+
+    public KeyBuilder add(String s) {
+      return add(s.getBytes(), true);
+    }
+
+    public KeyBuilder add(byte[] t) {
+      return add(t, false);
+    }
+
+    public KeyBuilder add(byte[] t, boolean sep) {
+      b[index] = t;
+      useSeparator[index] = sep;
+      length += t.length;
+      if (sep)
+        length++;
+      index++;
+      return this;
+    }
+
+    public byte[] getBytes() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+      for (int i = 0; i < index; i++) {
+        baos.write(b[i]);
+        if (i < index-1 && useSeparator[i])
+          baos.write(0x0);
+      }
+      return baos.toByteArray();
+    }
+
+    public byte[] getBytesForLookup() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+      for (int i = 0; i < index; i++) {
+        baos.write(b[i]);
+        if (useSeparator[i])
+          baos.write(0x0);
+      }
+      return baos.toByteArray();
+    }
+  }
+
+  private static class KeyParser {
+    private final byte[] b;
+    private int offset;
+
+    public KeyParser(byte[] b, int offset) {
+      this.b = b;
+      this.offset = offset;
+    }
+
+    public String getNextString() throws IOException {
+      if (offset >= b.length)
+        throw new IOException(
+            "tried to read nonexistent string from byte array");
+      int i = 0;
+      while (offset+i < b.length && b[offset+i] != 0x0)
+        i++;
+      String s = new String(b, offset, i);
+      offset = offset + i + 1;
+      return s;
+    }
+
+    public long getNextLong() throws IOException {
+      if (offset+8 >= b.length)
+        throw new IOException("byte array ran out when trying to read long");
+      long l = readReverseOrderedLong(b, offset);
+      offset += 8;
+      return l;
+    }
+
+    public int getOffset() {
+      return offset;
+    }
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fields) throws IOException {
+    DBIterator iterator = null;
+    try {
+      byte[] revStartTime = getStartTime(entityId, entityType, null, null, null);
+      if (revStartTime == null)
+        return null;
+      byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+          .add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
+
+      iterator = db.iterator();
+      iterator.seek(prefix);
+
+      return getEntity(entityId, entityType,
+          readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix,
+          prefix.length);
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Read entity from a db iterator.  If no information is found in the
+   * specified fields for this entity, return null.
+   */
+  private static TimelineEntity getEntity(String entityId, String entityType,
+      Long startTime, EnumSet<Field> fields, DBIterator iterator,
+      byte[] prefix, int prefixlen) throws IOException {
+    if (fields == null)
+      fields = EnumSet.allOf(Field.class);
+
+    TimelineEntity entity = new TimelineEntity();
+    boolean events = false;
+    boolean lastEvent = false;
+    if (fields.contains(Field.EVENTS)) {
+      events = true;
+      entity.setEvents(new ArrayList<TimelineEvent>());
+    } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+      lastEvent = true;
+      entity.setEvents(new ArrayList<TimelineEvent>());
+    }
+    else {
+      entity.setEvents(null);
+    }
+    boolean relatedEntities = false;
+    if (fields.contains(Field.RELATED_ENTITIES)) {
+      relatedEntities = true;
+    } else {
+      entity.setRelatedEntities(null);
+    }
+    boolean primaryFilters = false;
+    if (fields.contains(Field.PRIMARY_FILTERS)) {
+      primaryFilters = true;
+    } else {
+      entity.setPrimaryFilters(null);
+    }
+    boolean otherInfo = false;
+    if (fields.contains(Field.OTHER_INFO)) {
+      otherInfo = true;
+      entity.setOtherInfo(new HashMap<String, Object>());
+    } else {
+      entity.setOtherInfo(null);
+    }
+
+    // iterate through the entity's entry, parsing information if it is part
+    // of a requested field
+    for (; iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefixlen, key))
+        break;
+      if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) {
+        if (primaryFilters) {
+          addPrimaryFilter(entity, key,
+              prefixlen + PRIMARY_FILTER_COLUMN.length);
+        }
+      } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
+        if (otherInfo) {
+          entity.addOtherInfo(parseRemainingKey(key,
+              prefixlen + OTHER_INFO_COLUMN.length),
+              GenericObjectMapper.read(iterator.peekNext().getValue()));
+        }
+      } else if (key[prefixlen] == RELATED_COLUMN[0]) {
+        if (relatedEntities) {
+          addRelatedEntity(entity, key,
+              prefixlen + RELATED_COLUMN.length);
+        }
+      } else if (key[prefixlen] == TIME_COLUMN[0]) {
+        if (events || (lastEvent && entity.getEvents().size() == 0)) {
+          TimelineEvent event = getEntityEvent(null, key, prefixlen +
+              TIME_COLUMN.length, iterator.peekNext().getValue());
+          if (event != null) {
+            entity.addEvent(event);
+          }
+        }
+      } else {
+        LOG.warn(String.format("Found unexpected column for entity %s of " +
+            "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+      }
+    }
+
+    entity.setEntityId(entityId);
+    entity.setEntityType(entityType);
+    entity.setStartTime(startTime);
+
+    return entity;
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventType) throws IOException {
+    TimelineEvents events = new TimelineEvents();
+    if (entityIds == null || entityIds.isEmpty())
+      return events;
+    // create a lexicographically-ordered map from start time to entities
+    Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
+        List<EntityIdentifier>>(new Comparator<byte[]>() {
+          @Override
+          public int compare(byte[] o1, byte[] o2) {
+            return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
+                o2.length);
+          }
+        });
+    DBIterator iterator = null;
+    try {
+      // look up start times for the specified entities
+      // skip entities with no start time
+      for (String entity : entityIds) {
+        byte[] startTime = getStartTime(entity, entityType, null, null, null);
+        if (startTime != null) {
+          List<EntityIdentifier> entities = startTimeMap.get(startTime);
+          if (entities == null) {
+            entities = new ArrayList<EntityIdentifier>();
+            startTimeMap.put(startTime, entities);
+          }
+          entities.add(new EntityIdentifier(entity, entityType));
+        }
+      }
+      for (Entry<byte[], List<EntityIdentifier>> entry :
+          startTimeMap.entrySet()) {
+        // look up the events matching the given parameters (limit,
+        // start time, end time, event types) for entities whose start times
+        // were found and add the entities to the return list
+        byte[] revStartTime = entry.getKey();
+        for (EntityIdentifier entityID : entry.getValue()) {
+          EventsOfOneEntity entity = new EventsOfOneEntity();
+          entity.setEntityId(entityID.getId());
+          entity.setEntityType(entityType);
+          events.addEvent(entity);
+          KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+              .add(entityType).add(revStartTime).add(entityID.getId())
+              .add(TIME_COLUMN);
+          byte[] prefix = kb.getBytesForLookup();
+          if (windowEnd == null) {
+            windowEnd = Long.MAX_VALUE;
+          }
+          byte[] revts = writeReverseOrderedLong(windowEnd);
+          kb.add(revts);
+          byte[] first = kb.getBytesForLookup();
+          byte[] last = null;
+          if (windowStart != null) {
+            last = KeyBuilder.newInstance().add(prefix)
+                .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
+          }
+          if (limit == null) {
+            limit = DEFAULT_LIMIT;
+          }
+          iterator = db.iterator();
+          for (iterator.seek(first); entity.getEvents().size() < limit &&
+              iterator.hasNext(); iterator.next()) {
+            byte[] key = iterator.peekNext().getKey();
+            if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
+                WritableComparator.compareBytes(key, 0, key.length, last, 0,
+                    last.length) > 0))
+              break;
+            TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
+                iterator.peekNext().getValue());
+            if (event != null)
+              entity.addEvent(event);
+          }
+        }
+      }
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+    return events;
+  }
+
+  /**
+   * Returns true if the byte array begins with the specified prefix.
+   */
+  private static boolean prefixMatches(byte[] prefix, int prefixlen,
+      byte[] b) {
+    if (b.length < prefixlen)
+      return false;
+    return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
+        prefixlen) == 0;
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType,
+      Long limit, Long windowStart, Long windowEnd,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) throws IOException {
+    if (primaryFilter == null) {
+      // if no primary filter is specified, prefix the lookup with
+      // ENTITY_ENTRY_PREFIX
+      return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
+          windowStart, windowEnd, secondaryFilters, fields);
+    } else {
+      // if a primary filter is specified, prefix the lookup with
+      // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
+      // ENTITY_ENTRY_PREFIX
+      byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
+          .add(primaryFilter.getName())
+          .add(GenericObjectMapper.write(primaryFilter.getValue()), true)
+          .add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
+      return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
+          secondaryFilters, fields);
+    }
+  }
+
+  /**
+   * Retrieves a list of entities satisfying given parameters.
+   *
+   * @param base A byte array prefix for the lookup
+   * @param entityType The type of the entity
+   * @param limit A limit on the number of entities to return
+   * @param starttime The earliest entity start time to retrieve (exclusive)
+   * @param endtime The latest entity start time to retrieve (inclusive)
+   * @param secondaryFilters Filter pairs that the entities should match
+   * @param fields The set of fields to retrieve
+   * @return A list of entities
+   * @throws IOException
+   */
+  private TimelineEntities getEntityByTime(byte[] base,
+      String entityType, Long limit, Long starttime, Long endtime,
+      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields)
+      throws IOException {
+    DBIterator iterator = null;
+    try {
+      KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
+      // only db keys matching the prefix (base + entity type) will be parsed
+      byte[] prefix = kb.getBytesForLookup();
+      if (endtime == null) {
+        // if end time is null, place no restriction on end time
+        endtime = Long.MAX_VALUE;
+      }
+      // using end time, construct a first key that will be seeked to
+      byte[] revts = writeReverseOrderedLong(endtime);
+      kb.add(revts);
+      byte[] first = kb.getBytesForLookup();
+      byte[] last = null;
+      if (starttime != null) {
+        // if start time is not null, set a last key that will not be
+        // iterated past
+        last = KeyBuilder.newInstance().add(base).add(entityType)
+            .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
+      }
+      if (limit == null) {
+        // if limit is not specified, use the default
+        limit = DEFAULT_LIMIT;
+      }
+
+      TimelineEntities entities = new TimelineEntities();
+      iterator = db.iterator();
+      iterator.seek(first);
+      // iterate until one of the following conditions is met: limit is
+      // reached, there are no more keys, the key prefix no longer matches,
+      // or a start time has been specified and reached/exceeded
+      while (entities.getEntities().size() < limit && iterator.hasNext()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
+            WritableComparator.compareBytes(key, 0, key.length, last, 0,
+                last.length) > 0))
+          break;
+        // read the start time and entityId from the current key
+        KeyParser kp = new KeyParser(key, prefix.length);
+        Long startTime = kp.getNextLong();
+        String entityId = kp.getNextString();
+        // parse the entity that owns this key, iterating over all keys for
+        // the entity
+        TimelineEntity entity = getEntity(entityId, entityType, startTime,
+            fields, iterator, key, kp.getOffset());
+        if (entity == null)
+          continue;
+        // determine if the retrieved entity matches the provided secondary
+        // filters, and if so add it to the list of entities to return
+        boolean filterPassed = true;
+        if (secondaryFilters != null) {
+          for (NameValuePair filter : secondaryFilters) {
+            Object v = entity.getOtherInfo().get(filter.getName());
+            if (v == null) {
+              Set<Object> vs = entity.getPrimaryFilters()
+                  .get(filter.getName());
+              if (vs != null && !vs.contains(filter.getValue())) {
+                filterPassed = false;
+                break;
+              }
+            } else if (!v.equals(filter.getValue())) {
+              filterPassed = false;
+              break;
+            }
+          }
+        }
+        if (filterPassed)
+          entities.addEntity(entity);
+      }
+      return entities;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Put a single entity.  If there is an error, add a TimelinePutError to the given
+   * response.
+   */
+  private void put(TimelineEntity entity, TimelinePutResponse response) {
+    WriteBatch writeBatch = null;
+    try {
+      writeBatch = db.createWriteBatch();
+      List<TimelineEvent> events = entity.getEvents();
+      // look up the start time for the entity
+      byte[] revStartTime = getStartTime(entity.getEntityId(),
+          entity.getEntityType(), entity.getStartTime(), events,
+          writeBatch);
+      if (revStartTime == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.NO_START_TIME);
+        response.addError(error);
+        return;
+      }
+      Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0);
+      Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
+
+      // write event entries
+      if (events != null && !events.isEmpty()) {
+        for (TimelineEvent event : events) {
+          byte[] revts = writeReverseOrderedLong(event.getTimestamp());
+          byte[] key = createEntityEventKey(entity.getEntityId(),
+              entity.getEntityType(), revStartTime, revts,
+              event.getEventType());
+          byte[] value = GenericObjectMapper.write(event.getEventInfo());
+          writeBatch.put(key, value);
+          writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
+        }
+      }
+
+      // write related entity entries
+      Map<String, Set<String>> relatedEntities =
+          entity.getRelatedEntities();
+      if (relatedEntities != null && !relatedEntities.isEmpty()) {
+        for (Entry<String, Set<String>> relatedEntityList :
+            relatedEntities.entrySet()) {
+          String relatedEntityType = relatedEntityList.getKey();
+          for (String relatedEntityId : relatedEntityList.getValue()) {
+            // look up start time of related entity
+            byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
+                relatedEntityType, null, null, writeBatch);
+            if (relatedEntityStartTime == null) {
+              // if start time is not found, set start time of the related
+              // entity to the start time of this entity, and write it to the
+              // db and the cache
+              relatedEntityStartTime = revStartTime;
+              writeBatch.put(createStartTimeLookupKey(relatedEntityId,
+                  relatedEntityType), relatedEntityStartTime);
+              startTimeCache.put(new EntityIdentifier(relatedEntityId,
+                  relatedEntityType), revStartTimeLong);
+            }
+            // write reverse entry (related entity -> entity)
+            byte[] key = createReleatedEntityKey(relatedEntityId,
+                relatedEntityType, relatedEntityStartTime,
+                entity.getEntityId(), entity.getEntityType());
+            writeBatch.put(key, EMPTY_BYTES);
+            // TODO: write forward entry (entity -> related entity)?
+          }
+        }
+      }
+
+      // write primary filter entries
+      if (primaryFilters != null && !primaryFilters.isEmpty()) {
+        for (Entry<String, Set<Object>> primaryFilter :
+            primaryFilters.entrySet()) {
+          for (Object primaryFilterValue : primaryFilter.getValue()) {
+            byte[] key = createPrimaryFilterKey(entity.getEntityId(),
+                entity.getEntityType(), revStartTime,
+                primaryFilter.getKey(), primaryFilterValue);
+            writeBatch.put(key, EMPTY_BYTES);
+            writePrimaryFilterEntries(writeBatch, primaryFilters, key,
+                EMPTY_BYTES);
+          }
+        }
+      }
+
+      // write other info entries
+      Map<String, Object> otherInfo = entity.getOtherInfo();
+      if (otherInfo != null && !otherInfo.isEmpty()) {
+        for (Entry<String, Object> i : otherInfo.entrySet()) {
+          byte[] key = createOtherInfoKey(entity.getEntityId(),
+              entity.getEntityType(), revStartTime, i.getKey());
+          byte[] value = GenericObjectMapper.write(i.getValue());
+          writeBatch.put(key, value);
+          writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
+        }
+      }
+      db.write(writeBatch);
+    } catch (IOException e) {
+      LOG.error("Error putting entity " + entity.getEntityId() +
+          " of type " + entity.getEntityType(), e);
+      TimelinePutError error = new TimelinePutError();
+      error.setEntityId(entity.getEntityId());
+      error.setEntityType(entity.getEntityType());
+      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+      response.addError(error);
+    } finally {
+      IOUtils.cleanup(LOG, writeBatch);
+    }
+  }
+
+  /**
+   * For a given key / value pair that has been written to the db,
+   * write additional entries to the db for each primary filter.
+   */
+  private static void writePrimaryFilterEntries(WriteBatch writeBatch,
+      Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
+      throws IOException {
+    if (primaryFilters != null && !primaryFilters.isEmpty()) {
+      for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
+        for (Object pfval : pf.getValue()) {
+          writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval,
+              key), value);
+        }
+      }
+    }
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities entities) {
+    TimelinePutResponse response = new TimelinePutResponse();
+    for (TimelineEntity entity : entities.getEntities()) {
+      put(entity, response);
+    }
+    return response;
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts
+   * the timestamps in reverse order (see {@link
+   * GenericObjectMapper#writeReverseOrderedLong(long)}).
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
+   * @param startTime The start time of the entity, or null
+   * @param events A list of events for the entity, or null
+   * @param writeBatch A leveldb write batch, if the method is called by a
+   *                   put as opposed to a get
+   * @return A byte array
+   * @throws IOException
+   */
+  private byte[] getStartTime(String entityId, String entityType,
+      Long startTime, List<TimelineEvent> events, WriteBatch writeBatch)
+      throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    if (startTime == null) {
+      // start time is not provided, so try to look it up
+      if (startTimeCache.containsKey(entity)) {
+        // found the start time in the cache
+        startTime = startTimeCache.get(entity);
+      } else {
+        // try to look up the start time in the db
+        byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+        byte[] v = db.get(b);
+        if (v == null) {
+          // did not find the start time in the db
+          // if this is a put, try to set it from the provided events
+          if (events == null || writeBatch == null) {
+            // no events, or not a put, so return null
+            return null;
+          }
+          Long min = Long.MAX_VALUE;
+          for (TimelineEvent e : events)
+            if (min > e.getTimestamp())
+              min = e.getTimestamp();
+          startTime = min;
+          // selected start time as minimum timestamp of provided events
+          // write start time to db and cache
+          writeBatch.put(b, writeReverseOrderedLong(startTime));
+          startTimeCache.put(entity, startTime);
+        } else {
+          // found the start time in the db
+          startTime = readReverseOrderedLong(v, 0);
+          if (writeBatch != null) {
+            // if this is a put, re-add the start time to the cache
+            startTimeCache.put(entity, startTime);
+          }
+        }
+      }
+    } else {
+      // start time is provided
+      // TODO: verify start time in db as well as cache?
+      if (startTimeCache.containsKey(entity)) {
+        // if the start time is already in the cache,
+        // and it is different from the provided start time,
+        // use the one from the cache
+        if (!startTime.equals(startTimeCache.get(entity)))
+          startTime = startTimeCache.get(entity);
+      } else if (writeBatch != null) {
+        // if this is a put, write the provided start time to the db and the
+        // cache
+        byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+        writeBatch.put(b, writeReverseOrderedLong(startTime));
+        startTimeCache.put(entity, startTime);
+      }
+    }
+    return writeReverseOrderedLong(startTime);
+  }
+
+  /**
+   * Creates a key for looking up the start time of a given entity,
+   * of the form START_TIME_LOOKUP_PREFIX + entitytype + entity.
+   */
+  private static byte[] createStartTimeLookupKey(String entity,
+      String entitytype) throws IOException {
+    return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
+        .add(entitytype).add(entity).getBytes();
+  }
+
+  /**
+   * Creates an index entry for the given key of the form
+   * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
+   */
+  private static byte[] addPrimaryFilterToKey(String primaryFilterName,
+      Object primaryFilterValue, byte[] key) throws IOException {
+    return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
+        .add(primaryFilterName)
+        .add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
+        .getBytes();
+  }
+
+  /**
+   * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entitytype +
+   * revstarttime + entity + TIME_COLUMN + reveventtimestamp + eventtype.
+   */
+  private static byte[] createEntityEventKey(String entity, String entitytype,
+      byte[] revStartTime, byte[] reveventtimestamp, String eventtype)
+      throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entitytype).add(revStartTime).add(entity).add(TIME_COLUMN)
+        .add(reveventtimestamp).add(eventtype).getBytes();
+  }
+
+  /**
+   * Creates an event object from the given key, offset, and value.  If the
+   * event type is not contained in the specified set of event types,
+   * returns null.
+   */
+  private static TimelineEvent getEntityEvent(Set<String> eventTypes, byte[] key,
+      int offset, byte[] value) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    long ts = kp.getNextLong();
+    String tstype = kp.getNextString();
+    if (eventTypes == null || eventTypes.contains(tstype)) {
+      TimelineEvent event = new TimelineEvent();
+      event.setTimestamp(ts);
+      event.setEventType(tstype);
+      Object o = GenericObjectMapper.read(value);
+      if (o == null) {
+        event.setEventInfo(null);
+      } else if (o instanceof Map) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> m = (Map<String, Object>) o;
+        event.setEventInfo(m);
+      } else {
+        throw new IOException("Couldn't deserialize event info map");
+      }
+      return event;
+    }
+    return null;
+  }
+
+  /**
+   * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
+   * entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name + value.
+   */
+  private static byte[] createPrimaryFilterKey(String entity,
+      String entitytype, byte[] revStartTime, String name, Object value)
+      throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
+        .add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name)
+        .add(GenericObjectMapper.write(value)).getBytes();
+  }
+
+  /**
+   * Parses the primary filter from the given key at the given offset and
+   * adds it to the given entity.
+   */
+  private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String name = kp.getNextString();
+    Object value = GenericObjectMapper.read(key, kp.getOffset());
+    entity.addPrimaryFilter(name, value);
+  }
+
+  /**
+   * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entitytype +
+   * revstarttime + entity + OTHER_INFO_COLUMN + name.
+   */
+  private static byte[] createOtherInfoKey(String entity, String entitytype,
+      byte[] revStartTime, String name) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
+        .add(revStartTime).add(entity).add(OTHER_INFO_COLUMN).add(name)
+        .getBytes();
+  }
+
+  /**
+   * Creates a string representation of the byte array from the given offset
+   * to the end of the array (for parsing other info keys).
+   */
+  private static String parseRemainingKey(byte[] b, int offset) {
+    return new String(b, offset, b.length - offset);
+  }
+
+  /**
+   * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
+   * entitytype + revstarttime + entity + RELATED_COLUMN + relatedentitytype +
+   * relatedentity.
+   */
+  private static byte[] createReleatedEntityKey(String entity,
+      String entitytype, byte[] revStartTime, String relatedEntity,
+      String relatedEntityType) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype)
+        .add(revStartTime).add(entity).add(RELATED_COLUMN)
+        .add(relatedEntityType).add(relatedEntity).getBytes();
+  }
+
+  /**
+   * Parses the related entity from the given key at the given offset and
+   * adds it to the given entity.
+   */
+  private static void addRelatedEntity(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String type = kp.getNextString();
+    String id = kp.getNextString();
+    entity.addRelatedEntity(type, id);
+  }
+
+  /**
+   * Clears the cache to test reloading start times from leveldb (only for
+   * testing).
+   */
+  @VisibleForTesting
+  void clearStartTimeCache() {
+    startTimeCache.clear();
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,306 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+
+/**
+ * In-memory implementation of {@link TimelineStore}. This
+ * implementation is for test purpose only. If users improperly instantiate it,
+ * they may encounter reading and writing history data in different memory
+ * store.
+ * 
+ */
+@Private
+@Unstable
+public class MemoryTimelineStore
+    extends AbstractService implements TimelineStore {
+
+  private Map<EntityIdentifier, TimelineEntity> entities =
+      new HashMap<EntityIdentifier, TimelineEntity>();
+
+  public MemoryTimelineStore() {
+    super(MemoryTimelineStore.class.getName());
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType, Long limit,
+      Long windowStart, Long windowEnd, NameValuePair primaryFilter,
+      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields) {
+    if (limit == null) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (windowStart == null) {
+      windowStart = Long.MIN_VALUE;
+    }
+    if (windowEnd == null) {
+      windowEnd = Long.MAX_VALUE;
+    }
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
+    List<TimelineEntity> entitiesSelected = new ArrayList<TimelineEntity>();
+    for (TimelineEntity entity : new PriorityQueue<TimelineEntity>(entities.values())) {
+      if (entitiesSelected.size() >= limit) {
+        break;
+      }
+      if (!entity.getEntityType().equals(entityType)) {
+        continue;
+      }
+      if (entity.getStartTime() <= windowStart) {
+        continue;
+      }
+      if (entity.getStartTime() > windowEnd) {
+        continue;
+      }
+      if (primaryFilter != null &&
+          !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) {
+        continue;
+      }
+      if (secondaryFilters != null) { // OR logic
+        boolean flag = false;
+        for (NameValuePair secondaryFilter : secondaryFilters) {
+          if (secondaryFilter != null &&
+              matchFilter(entity.getOtherInfo(), secondaryFilter)) {
+            flag = true;
+            break;
+          }
+        }
+        if (!flag) {
+          continue;
+        }
+      }
+      entitiesSelected.add(entity);
+    }
+    List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
+    for (TimelineEntity entitySelected : entitiesSelected) {
+      entitiesToReturn.add(maskFields(entitySelected, fields));
+    }
+    Collections.sort(entitiesToReturn);
+    TimelineEntities entitiesWrapper = new TimelineEntities();
+    entitiesWrapper.setEntities(entitiesToReturn);
+    return entitiesWrapper;
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fieldsToRetrieve) {
+    if (fieldsToRetrieve == null) {
+      fieldsToRetrieve = EnumSet.allOf(Field.class);
+    }
+    TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType));
+    if (entity == null) {
+      return null;
+    } else {
+      return maskFields(entity, fieldsToRetrieve);
+    }
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd,
+      Set<String> eventTypes) {
+    TimelineEvents allEvents = new TimelineEvents();
+    if (entityIds == null) {
+      return allEvents;
+    }
+    if (limit == null) {
+      limit = DEFAULT_LIMIT;
+    }
+    if (windowStart == null) {
+      windowStart = Long.MIN_VALUE;
+    }
+    if (windowEnd == null) {
+      windowEnd = Long.MAX_VALUE;
+    }
+    for (String entityId : entityIds) {
+      EntityIdentifier entityID = new EntityIdentifier(entityId, entityType);
+      TimelineEntity entity = entities.get(entityID);
+      if (entity == null) {
+        continue;
+      }
+      EventsOfOneEntity events = new EventsOfOneEntity();
+      events.setEntityId(entityId);
+      events.setEntityType(entityType);
+      for (TimelineEvent event : entity.getEvents()) {
+        if (events.getEvents().size() >= limit) {
+          break;
+        }
+        if (event.getTimestamp() <= windowStart) {
+          continue;
+        }
+        if (event.getTimestamp() > windowEnd) {
+          continue;
+        }
+        if (eventTypes != null && !eventTypes.contains(event.getEventType())) {
+          continue;
+        }
+        events.addEvent(event);
+      }
+      allEvents.addEvent(events);
+    }
+    return allEvents;
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities data) {
+    TimelinePutResponse response = new TimelinePutResponse();
+    for (TimelineEntity entity : data.getEntities()) {
+      EntityIdentifier entityId =
+          new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
+      // store entity info in memory
+      TimelineEntity existingEntity = entities.get(entityId);
+      if (existingEntity == null) {
+        existingEntity = new TimelineEntity();
+        existingEntity.setEntityId(entity.getEntityId());
+        existingEntity.setEntityType(entity.getEntityType());
+        existingEntity.setStartTime(entity.getStartTime());
+        entities.put(entityId, existingEntity);
+      }
+      if (entity.getEvents() != null) {
+        if (existingEntity.getEvents() == null) {
+          existingEntity.setEvents(entity.getEvents());
+        } else {
+          existingEntity.addEvents(entity.getEvents());
+        }
+        Collections.sort(existingEntity.getEvents());
+      }
+      // check startTime
+      if (existingEntity.getStartTime() == null) {
+        if (existingEntity.getEvents() == null
+            || existingEntity.getEvents().isEmpty()) {
+          TimelinePutError error = new TimelinePutError();
+          error.setEntityId(entityId.getId());
+          error.setEntityType(entityId.getType());
+          error.setErrorCode(TimelinePutError.NO_START_TIME);
+          response.addError(error);
+          entities.remove(entityId);
+          continue;
+        } else {
+          existingEntity.setStartTime(entity.getEvents().get(0).getTimestamp());
+        }
+      }
+      if (entity.getPrimaryFilters() != null) {
+        if (existingEntity.getPrimaryFilters() == null) {
+          existingEntity.setPrimaryFilters(entity.getPrimaryFilters());
+        } else {
+          existingEntity.addPrimaryFilters(entity.getPrimaryFilters());
+        }
+      }
+      if (entity.getOtherInfo() != null) {
+        if (existingEntity.getOtherInfo() == null) {
+          existingEntity.setOtherInfo(entity.getOtherInfo());
+        } else {
+          existingEntity.addOtherInfo(entity.getOtherInfo());
+        }
+      }
+      // relate it to other entities
+      if (entity.getRelatedEntities() == null) {
+        continue;
+      }
+      for (Map.Entry<String, Set<String>> partRelatedEntities : entity
+          .getRelatedEntities().entrySet()) {
+        if (partRelatedEntities == null) {
+          continue;
+        }
+        for (String idStr : partRelatedEntities.getValue()) {
+          EntityIdentifier relatedEntityId =
+              new EntityIdentifier(idStr, partRelatedEntities.getKey());
+          TimelineEntity relatedEntity = entities.get(relatedEntityId);
+          if (relatedEntity != null) {
+            relatedEntity.addRelatedEntity(
+                existingEntity.getEntityType(), existingEntity.getEntityId());
+          } else {
+            relatedEntity = new TimelineEntity();
+            relatedEntity.setEntityId(relatedEntityId.getId());
+            relatedEntity.setEntityType(relatedEntityId.getType());
+            relatedEntity.setStartTime(existingEntity.getStartTime());
+            relatedEntity.addRelatedEntity(existingEntity.getEntityType(),
+                existingEntity.getEntityId());
+            entities.put(relatedEntityId, relatedEntity);
+          }
+        }
+      }
+    }
+    return response;
+  }
+
+  private static TimelineEntity maskFields(
+      TimelineEntity entity, EnumSet<Field> fields) {
+    // Conceal the fields that are not going to be exposed
+    TimelineEntity entityToReturn = new TimelineEntity();
+    entityToReturn.setEntityId(entity.getEntityId());
+    entityToReturn.setEntityType(entity.getEntityType());
+    entityToReturn.setStartTime(entity.getStartTime());
+    entityToReturn.setEvents(fields.contains(Field.EVENTS) ?
+        entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ?
+            Arrays.asList(entity.getEvents().get(0)) : null);
+    entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ?
+        entity.getRelatedEntities() : null);
+    entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ?
+        entity.getPrimaryFilters() : null);
+    entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ?
+        entity.getOtherInfo() : null);
+    return entityToReturn;
+  }
+
+  private static boolean matchFilter(Map<String, Object> tags,
+      NameValuePair filter) {
+    Object value = tags.get(filter.getName());
+    if (value == null) { // doesn't have the filter
+      return false;
+    } else if (!value.equals(filter.getValue())) { // doesn't match the filter
+      return false;
+    }
+    return true;
+  }
+
+  private static boolean matchPrimaryFilter(Map<String, Set<Object>> tags,
+      NameValuePair filter) {
+    Set<Object> value = tags.get(filter.getName());
+    if (value == null) { // doesn't have the filter
+      return false;
+    } else {
+      return value.contains(filter.getValue());
+    }
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A class holding a name and value pair, used for specifying filters in
+ * {@link TimelineReader}.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class NameValuePair {
+  String name;
+  Object value;
+
+  public NameValuePair(String name, Object value) {
+    this.name = name;
+    this.value = value;
+  }
+
+  /**
+   * Get the name.
+   * @return The name.
+   */
+  public String getName() {
+
+    return name;
+  }
+
+  /**
+   * Get the value.
+   * @return The value.
+   */
+  public Object getValue() {
+    return value;
+  }
+
+  @Override
+  public String toString() {
+    return "{ name: " + name + ", value: " + value + " }";
+  }
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.SortedSet;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+
+/**
+ * This interface is for retrieving timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineReader {
+
+  /**
+   * Possible fields to retrieve for {@link #getEntities} and {@link #getEntity}
+   * .
+   */
+  enum Field {
+    EVENTS,
+    RELATED_ENTITIES,
+    PRIMARY_FILTERS,
+    OTHER_INFO,
+    LAST_EVENT_ONLY
+  }
+
+  /**
+   * Default limit for {@link #getEntities} and {@link #getEntityTimelines}.
+   */
+  final long DEFAULT_LIMIT = 100;
+
+  /**
+   * This method retrieves a list of entity information, {@link TimelineEntity}, sorted
+   * by the starting timestamp for the entity, descending.
+   * 
+   * @param entityType
+   *          The type of entities to return (required).
+   * @param limit
+   *          A limit on the number of entities to return. If null, defaults to
+   *          {@link #DEFAULT_LIMIT}.
+   * @param windowStart
+   *          The earliest start timestamp to retrieve (exclusive). If null,
+   *          defaults to retrieving all entities until the limit is reached.
+   * @param windowEnd
+   *          The latest start timestamp to retrieve (inclusive). If null,
+   *          defaults to {@link Long#MAX_VALUE}
+   * @param primaryFilter
+   *          Retrieves only entities that have the specified primary filter. If
+   *          null, retrieves all entities. This is an indexed retrieval, and no
+   *          entities that do not match the filter are scanned.
+   * @param secondaryFilters
+   *          Retrieves only entities that have exact matches for all the
+   *          specified filters in their primary filters or other info. This is
+   *          not an indexed retrieval, so all entities are scanned but only
+   *          those matching the filters are returned.
+   * @param fieldsToRetrieve
+   *          Specifies which fields of the entity object to retrieve (see
+   *          {@link Field}). If the set of fields contains
+   *          {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+   *          most recent event for each entity is retrieved. If null, retrieves
+   *          all fields.
+   * @return An {@link TimelineEntities} object.
+   * @throws IOException
+   */
+  TimelineEntities getEntities(String entityType,
+      Long limit, Long windowStart, Long windowEnd,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fieldsToRetrieve) throws IOException;
+
+  /**
+   * This method retrieves the entity information for a given entity.
+   * 
+   * @param entityId
+   *          The entity whose information will be retrieved.
+   * @param entityType
+   *          The type of the entity.
+   * @param fieldsToRetrieve
+   *          Specifies which fields of the entity object to retrieve (see
+   *          {@link Field}). If the set of fields contains
+   *          {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the
+   *          most recent event for each entity is retrieved. If null, retrieves
+   *          all fields.
+   * @return An {@link TimelineEntity} object.
+   * @throws IOException
+   */
+  TimelineEntity getEntity(String entityId, String entityType, EnumSet<Field>
+      fieldsToRetrieve) throws IOException;
+
+  /**
+   * This method retrieves the events for a list of entities all of the same
+   * entity type. The events for each entity are sorted in order of their
+   * timestamps, descending.
+   * 
+   * @param entityType
+   *          The type of entities to retrieve events for.
+   * @param entityIds
+   *          The entity IDs to retrieve events for.
+   * @param limit
+   *          A limit on the number of events to return for each entity. If
+   *          null, defaults to {@link #DEFAULT_LIMIT} events per entity.
+   * @param windowStart
+   *          If not null, retrieves only events later than the given time
+   *          (exclusive)
+   * @param windowEnd
+   *          If not null, retrieves only events earlier than the given time
+   *          (inclusive)
+   * @param eventTypes
+   *          Restricts the events returned to the given types. If null, events
+   *          of all types will be returned.
+   * @return An {@link TimelineEvents} object.
+   * @throws IOException
+   */
+  TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventTypes) throws IOException;
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.Service;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineStore extends
+    Service, TimelineReader, TimelineWriter {
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+import java.io.IOException;
+
+/**
+ * This interface is for storing timeline information.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface TimelineWriter {
+
+  /**
+   * Stores entity information to the timeline store. Any errors occurring for
+   * individual put request objects will be reported in the response.
+   * 
+   * @param data
+   *          An {@link TimelineEntities} object.
+   * @return An {@link TimelinePutResponse} object.
+   * @throws IOException
+   */
+  TimelinePutResponse put(TimelineEntities data) throws IOException;
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+import org.apache.hadoop.classification.InterfaceAudience;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1570922&r1=1570921&r2=1570922&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Sat Feb 22 20:55:06 2014
@@ -21,7 +21,7 @@ import static org.apache.hadoop.yarn.uti
 
 import org.apache.hadoop.yarn.server.api.ApplicationContext;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
@@ -30,22 +30,22 @@ import org.apache.hadoop.yarn.webapp.Yar
 public class AHSWebApp extends WebApp implements YarnWebParams {
 
   private final ApplicationHistoryManager applicationHistoryManager;
-  private final ApplicationTimelineStore applicationTimelineStore;
+  private final TimelineStore timelineStore;
 
   public AHSWebApp(ApplicationHistoryManager applicationHistoryManager,
-      ApplicationTimelineStore applicationTimelineStore) {
+      TimelineStore timelineStore) {
     this.applicationHistoryManager = applicationHistoryManager;
-    this.applicationTimelineStore = applicationTimelineStore;
+    this.timelineStore = timelineStore;
   }
 
   @Override
   public void setup() {
     bind(YarnJacksonJaxbJsonProvider.class);
     bind(AHSWebServices.class);
-    bind(ATSWebServices.class);
+    bind(TimelineWebServices.class);
     bind(GenericExceptionHandler.class);
     bind(ApplicationContext.class).toInstance(applicationHistoryManager);
-    bind(ApplicationTimelineStore.class).toInstance(applicationTimelineStore);
+    bind(TimelineStore.class).toInstance(timelineStore);
     route("/", AHSController.class);
     route(pajoin("/apps", APP_STATE), AHSController.class);
     route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,328 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field;
+import org.apache.hadoop.yarn.webapp.BadRequestException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+@Singleton
+@Path("/ws/v1/timeline")
+//TODO: support XML serialization/deserialization
+public class TimelineWebServices {
+
+  private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
+
+  private TimelineStore store;
+
+  @Inject
+  public TimelineWebServices(TimelineStore store) {
+    this.store = store;
+  }
+
+  @XmlRootElement(name = "about")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class AboutInfo {
+
+    private String about;
+
+    public AboutInfo() {
+
+    }
+
+    public AboutInfo(String about) {
+      this.about = about;
+    }
+
+    @XmlElement(name = "About")
+    public String getAbout() {
+      return about;
+    }
+
+    public void setAbout(String about) {
+      this.about = about;
+    }
+
+  }
+
+  /**
+   * Return the description of the timeline web services.
+   */
+  @GET
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public AboutInfo about(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res) {
+    init(res);
+    return new AboutInfo("Timeline API");
+  }
+
+  /**
+   * Return a list of entities that match the given parameters.
+   */
+  @GET
+  @Path("/{entityType}")
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelineEntities getEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("entityType") String entityType,
+      @QueryParam("primaryFilter") String primaryFilter,
+      @QueryParam("secondaryFilter") String secondaryFilter,
+      @QueryParam("windowStart") String windowStart,
+      @QueryParam("windowEnd") String windowEnd,
+      @QueryParam("limit") String limit,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineEntities entities = null;
+    try {
+      entities = store.getEntities(
+          parseStr(entityType),
+          parseLongStr(limit),
+          parseLongStr(windowStart),
+          parseLongStr(windowEnd),
+          parsePairStr(primaryFilter, ":"),
+          parsePairsStr(secondaryFilter, ",", ":"),
+          parseFieldsStr(fields, ","));
+    } catch (NumberFormatException e) {
+      throw new BadRequestException(
+          "windowStart, windowEnd or limit is not a numeric value.");
+    } catch (IllegalArgumentException e) {
+      throw new BadRequestException("requested invalid field.");
+    } catch (IOException e) {
+      LOG.error("Error getting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (entities == null) {
+      return new TimelineEntities();
+    }
+    return entities;
+  }
+
+  /**
+   * Return a single entity of the given entity type and Id.
+   */
+  @GET
+  @Path("/{entityType}/{entityId}")
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelineEntity getEntity(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("entityType") String entityType,
+      @PathParam("entityId") String entityId,
+      @QueryParam("fields") String fields) {
+    init(res);
+    TimelineEntity entity = null;
+    try {
+      entity =
+          store.getEntity(parseStr(entityId), parseStr(entityType),
+              parseFieldsStr(fields, ","));
+    } catch (IllegalArgumentException e) {
+      throw new BadRequestException(
+          "requested invalid field.");
+    } catch (IOException e) {
+      LOG.error("Error getting entity", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (entity == null) {
+      throw new WebApplicationException(Response.Status.NOT_FOUND);
+    }
+    return entity;
+  }
+
+  /**
+   * Return the events that match the given parameters.
+   */
+  @GET
+  @Path("/{entityType}/events")
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelineEvents getEvents(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("entityType") String entityType,
+      @QueryParam("entityId") String entityId,
+      @QueryParam("eventType") String eventType,
+      @QueryParam("windowStart") String windowStart,
+      @QueryParam("windowEnd") String windowEnd,
+      @QueryParam("limit") String limit) {
+    init(res);
+    TimelineEvents events = null;
+    try {
+      events = store.getEntityTimelines(
+          parseStr(entityType),
+          parseArrayStr(entityId, ","),
+          parseLongStr(limit),
+          parseLongStr(windowStart),
+          parseLongStr(windowEnd),
+          parseArrayStr(eventType, ","));
+    } catch (NumberFormatException e) {
+      throw new BadRequestException(
+          "windowStart, windowEnd or limit is not a numeric value.");
+    } catch (IOException e) {
+      LOG.error("Error getting entity timelines", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+    if (events == null) {
+      return new TimelineEvents();
+    }
+    return events;
+  }
+
+  /**
+   * Store the given entities into the timeline store, and return the errors
+   * that happen during storing.
+   */
+  @POST
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelinePutResponse postEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      TimelineEntities entities) {
+    init(res);
+    if (entities == null) {
+      return new TimelinePutResponse();
+    }
+    try {
+      return store.put(entities);
+    } catch (IOException e) {
+      LOG.error("Error putting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private void init(HttpServletResponse response) {
+    response.setContentType(null);
+  }
+
+  private static SortedSet<String> parseArrayStr(String str, String delimiter) {
+    if (str == null) {
+      return null;
+    }
+    SortedSet<String> strSet = new TreeSet<String>();
+    String[] strs = str.split(delimiter);
+    for (String aStr : strs) {
+      strSet.add(aStr.trim());
+    }
+    return strSet;
+  }
+
+  private static NameValuePair parsePairStr(String str, String delimiter) {
+    if (str == null) {
+      return null;
+    }
+    String[] strs = str.split(delimiter, 2);
+    return new NameValuePair(strs[0].trim(), strs[1].trim());
+  }
+
+  private static Collection<NameValuePair> parsePairsStr(
+      String str, String aDelimiter, String pDelimiter) {
+    if (str == null) {
+      return null;
+    }
+    String[] strs = str.split(aDelimiter);
+    Set<NameValuePair> pairs = new HashSet<NameValuePair>();
+    for (String aStr : strs) {
+      pairs.add(parsePairStr(aStr, pDelimiter));
+    }
+    return pairs;
+  }
+
+  private static EnumSet<Field> parseFieldsStr(String str, String delimiter) {
+    if (str == null) {
+      return null;
+    }
+    String[] strs = str.split(delimiter);
+    List<Field> fieldList = new ArrayList<Field>();
+    for (String s : strs) {
+      s = s.trim().toUpperCase();
+      if (s.equals("EVENTS"))
+        fieldList.add(Field.EVENTS);
+      else if (s.equals("LASTEVENTONLY"))
+        fieldList.add(Field.LAST_EVENT_ONLY);
+      else if (s.equals("RELATEDENTITIES"))
+        fieldList.add(Field.RELATED_ENTITIES);
+      else if (s.equals("PRIMARYFILTERS"))
+        fieldList.add(Field.PRIMARY_FILTERS);
+      else if (s.equals("OTHERINFO"))
+        fieldList.add(Field.OTHER_INFO);
+    }
+    if (fieldList.size() == 0)
+      return null;
+    Field f1 = fieldList.remove(fieldList.size() - 1);
+    if (fieldList.size() == 0)
+      return EnumSet.of(f1);
+    else
+      return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()]));
+  }
+
+  private static Long parseLongStr(String str) {
+    return str == null ? null : Long.parseLong(str.trim());
+  }
+
+  private static String parseStr(String str) {
+    return str == null ? null : str.trim();
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestGenericObjectMapper {
+
+  @Test
+  public void testEncoding() {
+    testEncoding(Long.MAX_VALUE);
+    testEncoding(Long.MIN_VALUE);
+    testEncoding(0l);
+    testEncoding(128l);
+    testEncoding(256l);
+    testEncoding(512l);
+    testEncoding(-256l);
+  }
+
+  private static void testEncoding(long l) {
+    byte[] b = GenericObjectMapper.writeReverseOrderedLong(l);
+    assertEquals("error decoding", l,
+        GenericObjectMapper.readReverseOrderedLong(b, 0));
+    byte[] buf = new byte[16];
+    System.arraycopy(b, 0, buf, 5, 8);
+    assertEquals("error decoding at offset", l,
+        GenericObjectMapper.readReverseOrderedLong(buf, 5));
+    if (l > Long.MIN_VALUE) {
+      byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1);
+      assertEquals("error preserving ordering", 1,
+          WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length));
+    }
+    if (l < Long.MAX_VALUE) {
+      byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1);
+      assertEquals("error preserving ordering", 1,
+          WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length));
+    }
+  }
+
+  private static void verify(Object o) throws IOException {
+    assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o)));
+  }
+
+  @Test
+  public void testValueTypes() throws IOException {
+    verify(42l);
+    verify(42);
+    verify(1.23);
+    verify("abc");
+    verify(true);
+    List<String> list = new ArrayList<String>();
+    list.add("123");
+    list.add("abc");
+    verify(list);
+    Map<String,String> map = new HashMap<String,String>();
+    map.put("k1","v1");
+    map.put("k2","v2");
+    verify(map);
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java?rev=1570922&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java Sat Feb 22 20:55:06 2014
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestLeveldbTimelineStore
+    extends TimelineStoreTestUtils {
+  private FileContext fsContext;
+  private File fsPath;
+
+  @Before
+  public void setup() throws Exception {
+    fsContext = FileContext.getLocalFSFileContext();
+    Configuration conf = new Configuration();
+    fsPath = new File("target", this.getClass().getSimpleName() +
+        "-tmpDir").getAbsoluteFile();
+    fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+    conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+        fsPath.getAbsolutePath());
+    store = new LeveldbTimelineStore();
+    store.init(conf);
+    store.start();
+    loadTestData();
+    loadVerificationData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    store.stop();
+    fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+  }
+
+  @Test
+  public void testGetSingleEntity() throws IOException {
+    super.testGetSingleEntity();
+    ((LeveldbTimelineStore)store).clearStartTimeCache();
+    super.testGetSingleEntity();
+  }
+
+  @Test
+  public void testGetEntities() throws IOException {
+    super.testGetEntities();
+  }
+
+  @Test
+  public void testGetEntitiesWithPrimaryFilters() throws IOException {
+    super.testGetEntitiesWithPrimaryFilters();
+  }
+
+  @Test
+  public void testGetEntitiesWithSecondaryFilters() throws IOException {
+    super.testGetEntitiesWithSecondaryFilters();
+  }
+
+  @Test
+  public void testGetEvents() throws IOException {
+    super.testGetEvents();
+  }
+
+}



Mime
View raw message