Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B86BB10FE8 for ; Sat, 22 Feb 2014 20:55:36 +0000 (UTC) Received: (qmail 91809 invoked by uid 500); 22 Feb 2014 20:55:35 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 91765 invoked by uid 500); 22 Feb 2014 20:55:34 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 91727 invoked by uid 99); 22 Feb 2014 20:55:33 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Feb 2014 20:55:33 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 22 Feb 2014 20:55:29 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 2A4442388ADA; Sat, 22 Feb 2014 20:55:09 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1570922 [2/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/apptimeline/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timeline... Date: Sat, 22 Feb 2014 20:55:07 -0000 To: yarn-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140222205509.2A4442388ADA@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,873 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.collections.map.LRUMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.fusesource.leveldbjni.JniDBFactory; +import org.iq80.leveldb.DB; +import org.iq80.leveldb.DBIterator; +import org.iq80.leveldb.Options; +import org.iq80.leveldb.WriteBatch; + +import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.readReverseOrderedLong; +import static org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper.writeReverseOrderedLong; + +/** + * An implementation of a timeline store backed by leveldb. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class LeveldbTimelineStore extends AbstractService + implements TimelineStore { + private static final Log LOG = LogFactory + .getLog(LeveldbTimelineStore.class); + + private static final String FILENAME = "leveldb-timeline-store.ldb"; + + private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(); + private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(); + private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(); + + private static final byte[] PRIMARY_FILTER_COLUMN = "f".getBytes(); + private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(); + private static final byte[] RELATED_COLUMN = "r".getBytes(); + private static final byte[] TIME_COLUMN = "t".getBytes(); + + private static final byte[] EMPTY_BYTES = new byte[0]; + + private static final int START_TIME_CACHE_SIZE = 10000; + + @SuppressWarnings("unchecked") + private final Map startTimeCache = + Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE)); + + private DB db; + + public LeveldbTimelineStore() { + super(LeveldbTimelineStore.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + Options options = new Options(); + options.createIfMissing(true); + JniDBFactory factory = new JniDBFactory(); + String path = conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH); + File p = new File(path); + if (!p.exists()) + if (!p.mkdirs()) + throw new IOException("Couldn't create directory for leveldb " + + "timeline store " + path); + LOG.info("Using leveldb path " + path); + db = factory.open(new File(path, FILENAME), options); + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + IOUtils.cleanup(LOG, db); + super.serviceStop(); + } + + private static class KeyBuilder { + private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; + private byte[][] b; + private boolean[] useSeparator; + private int index; + private int length; + + public KeyBuilder(int size) { + b = new byte[size][]; + useSeparator = new boolean[size]; + index = 0; + length = 0; + } + + public static KeyBuilder newInstance() { + return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS); + } + + public KeyBuilder add(String s) { + return add(s.getBytes(), true); + } + + public KeyBuilder add(byte[] t) { + return add(t, false); + } + + public KeyBuilder add(byte[] t, boolean sep) { + b[index] = t; + useSeparator[index] = sep; + length += t.length; + if (sep) + length++; + index++; + return this; + } + + public byte[] getBytes() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(length); + for (int i = 0; i < index; i++) { + baos.write(b[i]); + if (i < index-1 && useSeparator[i]) + baos.write(0x0); + } + return baos.toByteArray(); + } + + public byte[] getBytesForLookup() throws IOException { + ByteArrayOutputStream baos = new ByteArrayOutputStream(length); + for (int i = 0; i < index; i++) { + baos.write(b[i]); + if (useSeparator[i]) + baos.write(0x0); + } + return baos.toByteArray(); + } + } + + private static class KeyParser { + private final byte[] b; + private int offset; + + public KeyParser(byte[] b, int offset) { + this.b = b; + this.offset = offset; + } + + public String getNextString() throws IOException { + if (offset >= b.length) + throw new IOException( + "tried to read nonexistent string from byte array"); + int i = 0; + while (offset+i < b.length && b[offset+i] != 0x0) + i++; + String s = new String(b, offset, i); + offset = offset + i + 1; + return s; + } + + public long getNextLong() throws IOException { + if (offset+8 >= b.length) + throw new IOException("byte array ran out when trying to read long"); + long l = readReverseOrderedLong(b, offset); + offset += 8; + return l; + } + + public int getOffset() { + return offset; + } + } + + @Override + public TimelineEntity getEntity(String entityId, String entityType, + EnumSet fields) throws IOException { + DBIterator iterator = null; + try { + byte[] revStartTime = getStartTime(entityId, entityType, null, null, null); + if (revStartTime == null) + return null; + byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entityType).add(revStartTime).add(entityId).getBytesForLookup(); + + iterator = db.iterator(); + iterator.seek(prefix); + + return getEntity(entityId, entityType, + readReverseOrderedLong(revStartTime, 0), fields, iterator, prefix, + prefix.length); + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Read entity from a db iterator. If no information is found in the + * specified fields for this entity, return null. + */ + private static TimelineEntity getEntity(String entityId, String entityType, + Long startTime, EnumSet fields, DBIterator iterator, + byte[] prefix, int prefixlen) throws IOException { + if (fields == null) + fields = EnumSet.allOf(Field.class); + + TimelineEntity entity = new TimelineEntity(); + boolean events = false; + boolean lastEvent = false; + if (fields.contains(Field.EVENTS)) { + events = true; + entity.setEvents(new ArrayList()); + } else if (fields.contains(Field.LAST_EVENT_ONLY)) { + lastEvent = true; + entity.setEvents(new ArrayList()); + } + else { + entity.setEvents(null); + } + boolean relatedEntities = false; + if (fields.contains(Field.RELATED_ENTITIES)) { + relatedEntities = true; + } else { + entity.setRelatedEntities(null); + } + boolean primaryFilters = false; + if (fields.contains(Field.PRIMARY_FILTERS)) { + primaryFilters = true; + } else { + entity.setPrimaryFilters(null); + } + boolean otherInfo = false; + if (fields.contains(Field.OTHER_INFO)) { + otherInfo = true; + entity.setOtherInfo(new HashMap()); + } else { + entity.setOtherInfo(null); + } + + // iterate through the entity's entry, parsing information if it is part + // of a requested field + for (; iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefixlen, key)) + break; + if (key[prefixlen] == PRIMARY_FILTER_COLUMN[0]) { + if (primaryFilters) { + addPrimaryFilter(entity, key, + prefixlen + PRIMARY_FILTER_COLUMN.length); + } + } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) { + if (otherInfo) { + entity.addOtherInfo(parseRemainingKey(key, + prefixlen + OTHER_INFO_COLUMN.length), + GenericObjectMapper.read(iterator.peekNext().getValue())); + } + } else if (key[prefixlen] == RELATED_COLUMN[0]) { + if (relatedEntities) { + addRelatedEntity(entity, key, + prefixlen + RELATED_COLUMN.length); + } + } else if (key[prefixlen] == TIME_COLUMN[0]) { + if (events || (lastEvent && entity.getEvents().size() == 0)) { + TimelineEvent event = getEntityEvent(null, key, prefixlen + + TIME_COLUMN.length, iterator.peekNext().getValue()); + if (event != null) { + entity.addEvent(event); + } + } + } else { + LOG.warn(String.format("Found unexpected column for entity %s of " + + "type %s (0x%02x)", entityId, entityType, key[prefixlen])); + } + } + + entity.setEntityId(entityId); + entity.setEntityType(entityType); + entity.setStartTime(startTime); + + return entity; + } + + @Override + public TimelineEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventType) throws IOException { + TimelineEvents events = new TimelineEvents(); + if (entityIds == null || entityIds.isEmpty()) + return events; + // create a lexicographically-ordered map from start time to entities + Map> startTimeMap = new TreeMap>(new Comparator() { + @Override + public int compare(byte[] o1, byte[] o2) { + return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, + o2.length); + } + }); + DBIterator iterator = null; + try { + // look up start times for the specified entities + // skip entities with no start time + for (String entity : entityIds) { + byte[] startTime = getStartTime(entity, entityType, null, null, null); + if (startTime != null) { + List entities = startTimeMap.get(startTime); + if (entities == null) { + entities = new ArrayList(); + startTimeMap.put(startTime, entities); + } + entities.add(new EntityIdentifier(entity, entityType)); + } + } + for (Entry> entry : + startTimeMap.entrySet()) { + // look up the events matching the given parameters (limit, + // start time, end time, event types) for entities whose start times + // were found and add the entities to the return list + byte[] revStartTime = entry.getKey(); + for (EntityIdentifier entityID : entry.getValue()) { + EventsOfOneEntity entity = new EventsOfOneEntity(); + entity.setEntityId(entityID.getId()); + entity.setEntityType(entityType); + events.addEvent(entity); + KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entityType).add(revStartTime).add(entityID.getId()) + .add(TIME_COLUMN); + byte[] prefix = kb.getBytesForLookup(); + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + byte[] revts = writeReverseOrderedLong(windowEnd); + kb.add(revts); + byte[] first = kb.getBytesForLookup(); + byte[] last = null; + if (windowStart != null) { + last = KeyBuilder.newInstance().add(prefix) + .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + iterator = db.iterator(); + for (iterator.seek(first); entity.getEvents().size() < limit && + iterator.hasNext(); iterator.next()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) || (last != null && + WritableComparator.compareBytes(key, 0, key.length, last, 0, + last.length) > 0)) + break; + TimelineEvent event = getEntityEvent(eventType, key, prefix.length, + iterator.peekNext().getValue()); + if (event != null) + entity.addEvent(event); + } + } + } + } finally { + IOUtils.cleanup(LOG, iterator); + } + return events; + } + + /** + * Returns true if the byte array begins with the specified prefix. + */ + private static boolean prefixMatches(byte[] prefix, int prefixlen, + byte[] b) { + if (b.length < prefixlen) + return false; + return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0, + prefixlen) == 0; + } + + @Override + public TimelineEntities getEntities(String entityType, + Long limit, Long windowStart, Long windowEnd, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fields) throws IOException { + if (primaryFilter == null) { + // if no primary filter is specified, prefix the lookup with + // ENTITY_ENTRY_PREFIX + return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit, + windowStart, windowEnd, secondaryFilters, fields); + } else { + // if a primary filter is specified, prefix the lookup with + // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + + // ENTITY_ENTRY_PREFIX + byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) + .add(primaryFilter.getName()) + .add(GenericObjectMapper.write(primaryFilter.getValue()), true) + .add(ENTITY_ENTRY_PREFIX).getBytesForLookup(); + return getEntityByTime(base, entityType, limit, windowStart, windowEnd, + secondaryFilters, fields); + } + } + + /** + * Retrieves a list of entities satisfying given parameters. + * + * @param base A byte array prefix for the lookup + * @param entityType The type of the entity + * @param limit A limit on the number of entities to return + * @param starttime The earliest entity start time to retrieve (exclusive) + * @param endtime The latest entity start time to retrieve (inclusive) + * @param secondaryFilters Filter pairs that the entities should match + * @param fields The set of fields to retrieve + * @return A list of entities + * @throws IOException + */ + private TimelineEntities getEntityByTime(byte[] base, + String entityType, Long limit, Long starttime, Long endtime, + Collection secondaryFilters, EnumSet fields) + throws IOException { + DBIterator iterator = null; + try { + KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); + // only db keys matching the prefix (base + entity type) will be parsed + byte[] prefix = kb.getBytesForLookup(); + if (endtime == null) { + // if end time is null, place no restriction on end time + endtime = Long.MAX_VALUE; + } + // using end time, construct a first key that will be seeked to + byte[] revts = writeReverseOrderedLong(endtime); + kb.add(revts); + byte[] first = kb.getBytesForLookup(); + byte[] last = null; + if (starttime != null) { + // if start time is not null, set a last key that will not be + // iterated past + last = KeyBuilder.newInstance().add(base).add(entityType) + .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); + } + if (limit == null) { + // if limit is not specified, use the default + limit = DEFAULT_LIMIT; + } + + TimelineEntities entities = new TimelineEntities(); + iterator = db.iterator(); + iterator.seek(first); + // iterate until one of the following conditions is met: limit is + // reached, there are no more keys, the key prefix no longer matches, + // or a start time has been specified and reached/exceeded + while (entities.getEntities().size() < limit && iterator.hasNext()) { + byte[] key = iterator.peekNext().getKey(); + if (!prefixMatches(prefix, prefix.length, key) || (last != null && + WritableComparator.compareBytes(key, 0, key.length, last, 0, + last.length) > 0)) + break; + // read the start time and entityId from the current key + KeyParser kp = new KeyParser(key, prefix.length); + Long startTime = kp.getNextLong(); + String entityId = kp.getNextString(); + // parse the entity that owns this key, iterating over all keys for + // the entity + TimelineEntity entity = getEntity(entityId, entityType, startTime, + fields, iterator, key, kp.getOffset()); + if (entity == null) + continue; + // determine if the retrieved entity matches the provided secondary + // filters, and if so add it to the list of entities to return + boolean filterPassed = true; + if (secondaryFilters != null) { + for (NameValuePair filter : secondaryFilters) { + Object v = entity.getOtherInfo().get(filter.getName()); + if (v == null) { + Set vs = entity.getPrimaryFilters() + .get(filter.getName()); + if (vs != null && !vs.contains(filter.getValue())) { + filterPassed = false; + break; + } + } else if (!v.equals(filter.getValue())) { + filterPassed = false; + break; + } + } + } + if (filterPassed) + entities.addEntity(entity); + } + return entities; + } finally { + IOUtils.cleanup(LOG, iterator); + } + } + + /** + * Put a single entity. If there is an error, add a TimelinePutError to the given + * response. + */ + private void put(TimelineEntity entity, TimelinePutResponse response) { + WriteBatch writeBatch = null; + try { + writeBatch = db.createWriteBatch(); + List events = entity.getEvents(); + // look up the start time for the entity + byte[] revStartTime = getStartTime(entity.getEntityId(), + entity.getEntityType(), entity.getStartTime(), events, + writeBatch); + if (revStartTime == null) { + // if no start time is found, add an error and return + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.NO_START_TIME); + response.addError(error); + return; + } + Long revStartTimeLong = readReverseOrderedLong(revStartTime, 0); + Map> primaryFilters = entity.getPrimaryFilters(); + + // write event entries + if (events != null && !events.isEmpty()) { + for (TimelineEvent event : events) { + byte[] revts = writeReverseOrderedLong(event.getTimestamp()); + byte[] key = createEntityEventKey(entity.getEntityId(), + entity.getEntityType(), revStartTime, revts, + event.getEventType()); + byte[] value = GenericObjectMapper.write(event.getEventInfo()); + writeBatch.put(key, value); + writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); + } + } + + // write related entity entries + Map> relatedEntities = + entity.getRelatedEntities(); + if (relatedEntities != null && !relatedEntities.isEmpty()) { + for (Entry> relatedEntityList : + relatedEntities.entrySet()) { + String relatedEntityType = relatedEntityList.getKey(); + for (String relatedEntityId : relatedEntityList.getValue()) { + // look up start time of related entity + byte[] relatedEntityStartTime = getStartTime(relatedEntityId, + relatedEntityType, null, null, writeBatch); + if (relatedEntityStartTime == null) { + // if start time is not found, set start time of the related + // entity to the start time of this entity, and write it to the + // db and the cache + relatedEntityStartTime = revStartTime; + writeBatch.put(createStartTimeLookupKey(relatedEntityId, + relatedEntityType), relatedEntityStartTime); + startTimeCache.put(new EntityIdentifier(relatedEntityId, + relatedEntityType), revStartTimeLong); + } + // write reverse entry (related entity -> entity) + byte[] key = createReleatedEntityKey(relatedEntityId, + relatedEntityType, relatedEntityStartTime, + entity.getEntityId(), entity.getEntityType()); + writeBatch.put(key, EMPTY_BYTES); + // TODO: write forward entry (entity -> related entity)? + } + } + } + + // write primary filter entries + if (primaryFilters != null && !primaryFilters.isEmpty()) { + for (Entry> primaryFilter : + primaryFilters.entrySet()) { + for (Object primaryFilterValue : primaryFilter.getValue()) { + byte[] key = createPrimaryFilterKey(entity.getEntityId(), + entity.getEntityType(), revStartTime, + primaryFilter.getKey(), primaryFilterValue); + writeBatch.put(key, EMPTY_BYTES); + writePrimaryFilterEntries(writeBatch, primaryFilters, key, + EMPTY_BYTES); + } + } + } + + // write other info entries + Map otherInfo = entity.getOtherInfo(); + if (otherInfo != null && !otherInfo.isEmpty()) { + for (Entry i : otherInfo.entrySet()) { + byte[] key = createOtherInfoKey(entity.getEntityId(), + entity.getEntityType(), revStartTime, i.getKey()); + byte[] value = GenericObjectMapper.write(i.getValue()); + writeBatch.put(key, value); + writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); + } + } + db.write(writeBatch); + } catch (IOException e) { + LOG.error("Error putting entity " + entity.getEntityId() + + " of type " + entity.getEntityType(), e); + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entity.getEntityId()); + error.setEntityType(entity.getEntityType()); + error.setErrorCode(TimelinePutError.IO_EXCEPTION); + response.addError(error); + } finally { + IOUtils.cleanup(LOG, writeBatch); + } + } + + /** + * For a given key / value pair that has been written to the db, + * write additional entries to the db for each primary filter. + */ + private static void writePrimaryFilterEntries(WriteBatch writeBatch, + Map> primaryFilters, byte[] key, byte[] value) + throws IOException { + if (primaryFilters != null && !primaryFilters.isEmpty()) { + for (Entry> pf : primaryFilters.entrySet()) { + for (Object pfval : pf.getValue()) { + writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, + key), value); + } + } + } + } + + @Override + public TimelinePutResponse put(TimelineEntities entities) { + TimelinePutResponse response = new TimelinePutResponse(); + for (TimelineEntity entity : entities.getEntities()) { + put(entity, response); + } + return response; + } + + /** + * Get the unique start time for a given entity as a byte array that sorts + * the timestamps in reverse order (see {@link + * GenericObjectMapper#writeReverseOrderedLong(long)}). + * + * @param entityId The id of the entity + * @param entityType The type of the entity + * @param startTime The start time of the entity, or null + * @param events A list of events for the entity, or null + * @param writeBatch A leveldb write batch, if the method is called by a + * put as opposed to a get + * @return A byte array + * @throws IOException + */ + private byte[] getStartTime(String entityId, String entityType, + Long startTime, List events, WriteBatch writeBatch) + throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + if (startTime == null) { + // start time is not provided, so try to look it up + if (startTimeCache.containsKey(entity)) { + // found the start time in the cache + startTime = startTimeCache.get(entity); + } else { + // try to look up the start time in the db + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + byte[] v = db.get(b); + if (v == null) { + // did not find the start time in the db + // if this is a put, try to set it from the provided events + if (events == null || writeBatch == null) { + // no events, or not a put, so return null + return null; + } + Long min = Long.MAX_VALUE; + for (TimelineEvent e : events) + if (min > e.getTimestamp()) + min = e.getTimestamp(); + startTime = min; + // selected start time as minimum timestamp of provided events + // write start time to db and cache + writeBatch.put(b, writeReverseOrderedLong(startTime)); + startTimeCache.put(entity, startTime); + } else { + // found the start time in the db + startTime = readReverseOrderedLong(v, 0); + if (writeBatch != null) { + // if this is a put, re-add the start time to the cache + startTimeCache.put(entity, startTime); + } + } + } + } else { + // start time is provided + // TODO: verify start time in db as well as cache? + if (startTimeCache.containsKey(entity)) { + // if the start time is already in the cache, + // and it is different from the provided start time, + // use the one from the cache + if (!startTime.equals(startTimeCache.get(entity))) + startTime = startTimeCache.get(entity); + } else if (writeBatch != null) { + // if this is a put, write the provided start time to the db and the + // cache + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + writeBatch.put(b, writeReverseOrderedLong(startTime)); + startTimeCache.put(entity, startTime); + } + } + return writeReverseOrderedLong(startTime); + } + + /** + * Creates a key for looking up the start time of a given entity, + * of the form START_TIME_LOOKUP_PREFIX + entitytype + entity. + */ + private static byte[] createStartTimeLookupKey(String entity, + String entitytype) throws IOException { + return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX) + .add(entitytype).add(entity).getBytes(); + } + + /** + * Creates an index entry for the given key of the form + * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key. + */ + private static byte[] addPrimaryFilterToKey(String primaryFilterName, + Object primaryFilterValue, byte[] key) throws IOException { + return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) + .add(primaryFilterName) + .add(GenericObjectMapper.write(primaryFilterValue), true).add(key) + .getBytes(); + } + + /** + * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entitytype + + * revstarttime + entity + TIME_COLUMN + reveventtimestamp + eventtype. + */ + private static byte[] createEntityEventKey(String entity, String entitytype, + byte[] revStartTime, byte[] reveventtimestamp, String eventtype) + throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) + .add(entitytype).add(revStartTime).add(entity).add(TIME_COLUMN) + .add(reveventtimestamp).add(eventtype).getBytes(); + } + + /** + * Creates an event object from the given key, offset, and value. If the + * event type is not contained in the specified set of event types, + * returns null. + */ + private static TimelineEvent getEntityEvent(Set eventTypes, byte[] key, + int offset, byte[] value) throws IOException { + KeyParser kp = new KeyParser(key, offset); + long ts = kp.getNextLong(); + String tstype = kp.getNextString(); + if (eventTypes == null || eventTypes.contains(tstype)) { + TimelineEvent event = new TimelineEvent(); + event.setTimestamp(ts); + event.setEventType(tstype); + Object o = GenericObjectMapper.read(value); + if (o == null) { + event.setEventInfo(null); + } else if (o instanceof Map) { + @SuppressWarnings("unchecked") + Map m = (Map) o; + event.setEventInfo(m); + } else { + throw new IOException("Couldn't deserialize event info map"); + } + return event; + } + return null; + } + + /** + * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX + + * entitytype + revstarttime + entity + PRIMARY_FILTER_COLUMN + name + value. + */ + private static byte[] createPrimaryFilterKey(String entity, + String entitytype, byte[] revStartTime, String name, Object value) + throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) + .add(revStartTime).add(entity).add(PRIMARY_FILTER_COLUMN).add(name) + .add(GenericObjectMapper.write(value)).getBytes(); + } + + /** + * Parses the primary filter from the given key at the given offset and + * adds it to the given entity. + */ + private static void addPrimaryFilter(TimelineEntity entity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String name = kp.getNextString(); + Object value = GenericObjectMapper.read(key, kp.getOffset()); + entity.addPrimaryFilter(name, value); + } + + /** + * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entitytype + + * revstarttime + entity + OTHER_INFO_COLUMN + name. + */ + private static byte[] createOtherInfoKey(String entity, String entitytype, + byte[] revStartTime, String name) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) + .add(revStartTime).add(entity).add(OTHER_INFO_COLUMN).add(name) + .getBytes(); + } + + /** + * Creates a string representation of the byte array from the given offset + * to the end of the array (for parsing other info keys). + */ + private static String parseRemainingKey(byte[] b, int offset) { + return new String(b, offset, b.length - offset); + } + + /** + * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + + * entitytype + revstarttime + entity + RELATED_COLUMN + relatedentitytype + + * relatedentity. + */ + private static byte[] createReleatedEntityKey(String entity, + String entitytype, byte[] revStartTime, String relatedEntity, + String relatedEntityType) throws IOException { + return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entitytype) + .add(revStartTime).add(entity).add(RELATED_COLUMN) + .add(relatedEntityType).add(relatedEntity).getBytes(); + } + + /** + * Parses the related entity from the given key at the given offset and + * adds it to the given entity. + */ + private static void addRelatedEntity(TimelineEntity entity, byte[] key, + int offset) throws IOException { + KeyParser kp = new KeyParser(key, offset); + String type = kp.getNextString(); + String id = kp.getNextString(); + entity.addRelatedEntity(type, id); + } + + /** + * Clears the cache to test reloading start times from leveldb (only for + * testing). + */ + @VisibleForTesting + void clearStartTimeCache() { + startTimeCache.clear(); + } +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/MemoryTimelineStore.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,306 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; + +/** + * In-memory implementation of {@link TimelineStore}. This + * implementation is for test purpose only. If users improperly instantiate it, + * they may encounter reading and writing history data in different memory + * store. + * + */ +@Private +@Unstable +public class MemoryTimelineStore + extends AbstractService implements TimelineStore { + + private Map entities = + new HashMap(); + + public MemoryTimelineStore() { + super(MemoryTimelineStore.class.getName()); + } + + @Override + public TimelineEntities getEntities(String entityType, Long limit, + Long windowStart, Long windowEnd, NameValuePair primaryFilter, + Collection secondaryFilters, EnumSet fields) { + if (limit == null) { + limit = DEFAULT_LIMIT; + } + if (windowStart == null) { + windowStart = Long.MIN_VALUE; + } + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + if (fields == null) { + fields = EnumSet.allOf(Field.class); + } + List entitiesSelected = new ArrayList(); + for (TimelineEntity entity : new PriorityQueue(entities.values())) { + if (entitiesSelected.size() >= limit) { + break; + } + if (!entity.getEntityType().equals(entityType)) { + continue; + } + if (entity.getStartTime() <= windowStart) { + continue; + } + if (entity.getStartTime() > windowEnd) { + continue; + } + if (primaryFilter != null && + !matchPrimaryFilter(entity.getPrimaryFilters(), primaryFilter)) { + continue; + } + if (secondaryFilters != null) { // OR logic + boolean flag = false; + for (NameValuePair secondaryFilter : secondaryFilters) { + if (secondaryFilter != null && + matchFilter(entity.getOtherInfo(), secondaryFilter)) { + flag = true; + break; + } + } + if (!flag) { + continue; + } + } + entitiesSelected.add(entity); + } + List entitiesToReturn = new ArrayList(); + for (TimelineEntity entitySelected : entitiesSelected) { + entitiesToReturn.add(maskFields(entitySelected, fields)); + } + Collections.sort(entitiesToReturn); + TimelineEntities entitiesWrapper = new TimelineEntities(); + entitiesWrapper.setEntities(entitiesToReturn); + return entitiesWrapper; + } + + @Override + public TimelineEntity getEntity(String entityId, String entityType, + EnumSet fieldsToRetrieve) { + if (fieldsToRetrieve == null) { + fieldsToRetrieve = EnumSet.allOf(Field.class); + } + TimelineEntity entity = entities.get(new EntityIdentifier(entityId, entityType)); + if (entity == null) { + return null; + } else { + return maskFields(entity, fieldsToRetrieve); + } + } + + @Override + public TimelineEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, + Set eventTypes) { + TimelineEvents allEvents = new TimelineEvents(); + if (entityIds == null) { + return allEvents; + } + if (limit == null) { + limit = DEFAULT_LIMIT; + } + if (windowStart == null) { + windowStart = Long.MIN_VALUE; + } + if (windowEnd == null) { + windowEnd = Long.MAX_VALUE; + } + for (String entityId : entityIds) { + EntityIdentifier entityID = new EntityIdentifier(entityId, entityType); + TimelineEntity entity = entities.get(entityID); + if (entity == null) { + continue; + } + EventsOfOneEntity events = new EventsOfOneEntity(); + events.setEntityId(entityId); + events.setEntityType(entityType); + for (TimelineEvent event : entity.getEvents()) { + if (events.getEvents().size() >= limit) { + break; + } + if (event.getTimestamp() <= windowStart) { + continue; + } + if (event.getTimestamp() > windowEnd) { + continue; + } + if (eventTypes != null && !eventTypes.contains(event.getEventType())) { + continue; + } + events.addEvent(event); + } + allEvents.addEvent(events); + } + return allEvents; + } + + @Override + public TimelinePutResponse put(TimelineEntities data) { + TimelinePutResponse response = new TimelinePutResponse(); + for (TimelineEntity entity : data.getEntities()) { + EntityIdentifier entityId = + new EntityIdentifier(entity.getEntityId(), entity.getEntityType()); + // store entity info in memory + TimelineEntity existingEntity = entities.get(entityId); + if (existingEntity == null) { + existingEntity = new TimelineEntity(); + existingEntity.setEntityId(entity.getEntityId()); + existingEntity.setEntityType(entity.getEntityType()); + existingEntity.setStartTime(entity.getStartTime()); + entities.put(entityId, existingEntity); + } + if (entity.getEvents() != null) { + if (existingEntity.getEvents() == null) { + existingEntity.setEvents(entity.getEvents()); + } else { + existingEntity.addEvents(entity.getEvents()); + } + Collections.sort(existingEntity.getEvents()); + } + // check startTime + if (existingEntity.getStartTime() == null) { + if (existingEntity.getEvents() == null + || existingEntity.getEvents().isEmpty()) { + TimelinePutError error = new TimelinePutError(); + error.setEntityId(entityId.getId()); + error.setEntityType(entityId.getType()); + error.setErrorCode(TimelinePutError.NO_START_TIME); + response.addError(error); + entities.remove(entityId); + continue; + } else { + existingEntity.setStartTime(entity.getEvents().get(0).getTimestamp()); + } + } + if (entity.getPrimaryFilters() != null) { + if (existingEntity.getPrimaryFilters() == null) { + existingEntity.setPrimaryFilters(entity.getPrimaryFilters()); + } else { + existingEntity.addPrimaryFilters(entity.getPrimaryFilters()); + } + } + if (entity.getOtherInfo() != null) { + if (existingEntity.getOtherInfo() == null) { + existingEntity.setOtherInfo(entity.getOtherInfo()); + } else { + existingEntity.addOtherInfo(entity.getOtherInfo()); + } + } + // relate it to other entities + if (entity.getRelatedEntities() == null) { + continue; + } + for (Map.Entry> partRelatedEntities : entity + .getRelatedEntities().entrySet()) { + if (partRelatedEntities == null) { + continue; + } + for (String idStr : partRelatedEntities.getValue()) { + EntityIdentifier relatedEntityId = + new EntityIdentifier(idStr, partRelatedEntities.getKey()); + TimelineEntity relatedEntity = entities.get(relatedEntityId); + if (relatedEntity != null) { + relatedEntity.addRelatedEntity( + existingEntity.getEntityType(), existingEntity.getEntityId()); + } else { + relatedEntity = new TimelineEntity(); + relatedEntity.setEntityId(relatedEntityId.getId()); + relatedEntity.setEntityType(relatedEntityId.getType()); + relatedEntity.setStartTime(existingEntity.getStartTime()); + relatedEntity.addRelatedEntity(existingEntity.getEntityType(), + existingEntity.getEntityId()); + entities.put(relatedEntityId, relatedEntity); + } + } + } + } + return response; + } + + private static TimelineEntity maskFields( + TimelineEntity entity, EnumSet fields) { + // Conceal the fields that are not going to be exposed + TimelineEntity entityToReturn = new TimelineEntity(); + entityToReturn.setEntityId(entity.getEntityId()); + entityToReturn.setEntityType(entity.getEntityType()); + entityToReturn.setStartTime(entity.getStartTime()); + entityToReturn.setEvents(fields.contains(Field.EVENTS) ? + entity.getEvents() : fields.contains(Field.LAST_EVENT_ONLY) ? + Arrays.asList(entity.getEvents().get(0)) : null); + entityToReturn.setRelatedEntities(fields.contains(Field.RELATED_ENTITIES) ? + entity.getRelatedEntities() : null); + entityToReturn.setPrimaryFilters(fields.contains(Field.PRIMARY_FILTERS) ? + entity.getPrimaryFilters() : null); + entityToReturn.setOtherInfo(fields.contains(Field.OTHER_INFO) ? + entity.getOtherInfo() : null); + return entityToReturn; + } + + private static boolean matchFilter(Map tags, + NameValuePair filter) { + Object value = tags.get(filter.getName()); + if (value == null) { // doesn't have the filter + return false; + } else if (!value.equals(filter.getValue())) { // doesn't match the filter + return false; + } + return true; + } + + private static boolean matchPrimaryFilter(Map> tags, + NameValuePair filter) { + Set value = tags.get(filter.getName()); + if (value == null) { // doesn't have the filter + return false; + } else { + return value.contains(filter.getValue()); + } + } + +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/NameValuePair.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A class holding a name and value pair, used for specifying filters in + * {@link TimelineReader}. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class NameValuePair { + String name; + Object value; + + public NameValuePair(String name, Object value) { + this.name = name; + this.value = value; + } + + /** + * Get the name. + * @return The name. + */ + public String getName() { + + return name; + } + + /** + * Get the value. + * @return The value. + */ + public Object getValue() { + return value; + } + + @Override + public String toString() { + return "{ name: " + name + ", value: " + value + " }"; + } +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineReader.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import java.io.IOException; +import java.util.Collection; +import java.util.EnumSet; +import java.util.Set; +import java.util.SortedSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; + +/** + * This interface is for retrieving timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface TimelineReader { + + /** + * Possible fields to retrieve for {@link #getEntities} and {@link #getEntity} + * . + */ + enum Field { + EVENTS, + RELATED_ENTITIES, + PRIMARY_FILTERS, + OTHER_INFO, + LAST_EVENT_ONLY + } + + /** + * Default limit for {@link #getEntities} and {@link #getEntityTimelines}. + */ + final long DEFAULT_LIMIT = 100; + + /** + * This method retrieves a list of entity information, {@link TimelineEntity}, sorted + * by the starting timestamp for the entity, descending. + * + * @param entityType + * The type of entities to return (required). + * @param limit + * A limit on the number of entities to return. If null, defaults to + * {@link #DEFAULT_LIMIT}. + * @param windowStart + * The earliest start timestamp to retrieve (exclusive). If null, + * defaults to retrieving all entities until the limit is reached. + * @param windowEnd + * The latest start timestamp to retrieve (inclusive). If null, + * defaults to {@link Long#MAX_VALUE} + * @param primaryFilter + * Retrieves only entities that have the specified primary filter. If + * null, retrieves all entities. This is an indexed retrieval, and no + * entities that do not match the filter are scanned. + * @param secondaryFilters + * Retrieves only entities that have exact matches for all the + * specified filters in their primary filters or other info. This is + * not an indexed retrieval, so all entities are scanned but only + * those matching the filters are returned. + * @param fieldsToRetrieve + * Specifies which fields of the entity object to retrieve (see + * {@link Field}). If the set of fields contains + * {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the + * most recent event for each entity is retrieved. If null, retrieves + * all fields. + * @return An {@link TimelineEntities} object. + * @throws IOException + */ + TimelineEntities getEntities(String entityType, + Long limit, Long windowStart, Long windowEnd, + NameValuePair primaryFilter, Collection secondaryFilters, + EnumSet fieldsToRetrieve) throws IOException; + + /** + * This method retrieves the entity information for a given entity. + * + * @param entityId + * The entity whose information will be retrieved. + * @param entityType + * The type of the entity. + * @param fieldsToRetrieve + * Specifies which fields of the entity object to retrieve (see + * {@link Field}). If the set of fields contains + * {@link Field#LAST_EVENT_ONLY} and not {@link Field#EVENTS}, the + * most recent event for each entity is retrieved. If null, retrieves + * all fields. + * @return An {@link TimelineEntity} object. + * @throws IOException + */ + TimelineEntity getEntity(String entityId, String entityType, EnumSet + fieldsToRetrieve) throws IOException; + + /** + * This method retrieves the events for a list of entities all of the same + * entity type. The events for each entity are sorted in order of their + * timestamps, descending. + * + * @param entityType + * The type of entities to retrieve events for. + * @param entityIds + * The entity IDs to retrieve events for. + * @param limit + * A limit on the number of events to return for each entity. If + * null, defaults to {@link #DEFAULT_LIMIT} events per entity. + * @param windowStart + * If not null, retrieves only events later than the given time + * (exclusive) + * @param windowEnd + * If not null, retrieves only events earlier than the given time + * (inclusive) + * @param eventTypes + * Restricts the events returned to the given types. If null, events + * of all types will be returned. + * @return An {@link TimelineEvents} object. + * @throws IOException + */ + TimelineEvents getEntityTimelines(String entityType, + SortedSet entityIds, Long limit, Long windowStart, + Long windowEnd, Set eventTypes) throws IOException; +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineStore.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.Service; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface TimelineStore extends + Service, TimelineReader, TimelineWriter { +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TimelineWriter.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; + +import java.io.IOException; + +/** + * This interface is for storing timeline information. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public interface TimelineWriter { + + /** + * Stores entity information to the timeline store. Any errors occurring for + * individual put request objects will be reported in the response. + * + * @param data + * An {@link TimelineEntities} object. + * @return An {@link TimelinePutResponse} object. + * @throws IOException + */ + TimelinePutResponse put(TimelineEntities data) throws IOException; + +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/package-info.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,20 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; +import org.apache.hadoop.classification.InterfaceAudience; Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1570922&r1=1570921&r2=1570922&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Sat Feb 22 20:55:06 2014 @@ -21,7 +21,7 @@ import static org.apache.hadoop.yarn.uti import org.apache.hadoop.yarn.server.api.ApplicationContext; import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager; -import org.apache.hadoop.yarn.server.applicationhistoryservice.apptimeline.ApplicationTimelineStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.WebApp; import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; @@ -30,22 +30,22 @@ import org.apache.hadoop.yarn.webapp.Yar public class AHSWebApp extends WebApp implements YarnWebParams { private final ApplicationHistoryManager applicationHistoryManager; - private final ApplicationTimelineStore applicationTimelineStore; + private final TimelineStore timelineStore; public AHSWebApp(ApplicationHistoryManager applicationHistoryManager, - ApplicationTimelineStore applicationTimelineStore) { + TimelineStore timelineStore) { this.applicationHistoryManager = applicationHistoryManager; - this.applicationTimelineStore = applicationTimelineStore; + this.timelineStore = timelineStore; } @Override public void setup() { bind(YarnJacksonJaxbJsonProvider.class); bind(AHSWebServices.class); - bind(ATSWebServices.class); + bind(TimelineWebServices.class); bind(GenericExceptionHandler.class); bind(ApplicationContext.class).toInstance(applicationHistoryManager); - bind(ApplicationTimelineStore.class).toInstance(applicationTimelineStore); + bind(TimelineStore.class).toInstance(timelineStore); route("/", AHSController.class); route(pajoin("/apps", APP_STATE), AHSController.class); route(pajoin("/app", APPLICATION_ID), AHSController.class, "app"); Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,328 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore; +import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.NameValuePair; +import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineReader.Field; +import org.apache.hadoop.yarn.webapp.BadRequestException; + +import com.google.inject.Inject; +import com.google.inject.Singleton; + +@Singleton +@Path("/ws/v1/timeline") +//TODO: support XML serialization/deserialization +public class TimelineWebServices { + + private static final Log LOG = LogFactory.getLog(TimelineWebServices.class); + + private TimelineStore store; + + @Inject + public TimelineWebServices(TimelineStore store) { + this.store = store; + } + + @XmlRootElement(name = "about") + @XmlAccessorType(XmlAccessType.NONE) + @Public + @Unstable + public static class AboutInfo { + + private String about; + + public AboutInfo() { + + } + + public AboutInfo(String about) { + this.about = about; + } + + @XmlElement(name = "About") + public String getAbout() { + return about; + } + + public void setAbout(String about) { + this.about = about; + } + + } + + /** + * Return the description of the timeline web services. + */ + @GET + @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public AboutInfo about( + @Context HttpServletRequest req, + @Context HttpServletResponse res) { + init(res); + return new AboutInfo("Timeline API"); + } + + /** + * Return a list of entities that match the given parameters. + */ + @GET + @Path("/{entityType}") + @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelineEntities getEntities( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("entityType") String entityType, + @QueryParam("primaryFilter") String primaryFilter, + @QueryParam("secondaryFilter") String secondaryFilter, + @QueryParam("windowStart") String windowStart, + @QueryParam("windowEnd") String windowEnd, + @QueryParam("limit") String limit, + @QueryParam("fields") String fields) { + init(res); + TimelineEntities entities = null; + try { + entities = store.getEntities( + parseStr(entityType), + parseLongStr(limit), + parseLongStr(windowStart), + parseLongStr(windowEnd), + parsePairStr(primaryFilter, ":"), + parsePairsStr(secondaryFilter, ",", ":"), + parseFieldsStr(fields, ",")); + } catch (NumberFormatException e) { + throw new BadRequestException( + "windowStart, windowEnd or limit is not a numeric value."); + } catch (IllegalArgumentException e) { + throw new BadRequestException("requested invalid field."); + } catch (IOException e) { + LOG.error("Error getting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + if (entities == null) { + return new TimelineEntities(); + } + return entities; + } + + /** + * Return a single entity of the given entity type and Id. + */ + @GET + @Path("/{entityType}/{entityId}") + @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelineEntity getEntity( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("entityType") String entityType, + @PathParam("entityId") String entityId, + @QueryParam("fields") String fields) { + init(res); + TimelineEntity entity = null; + try { + entity = + store.getEntity(parseStr(entityId), parseStr(entityType), + parseFieldsStr(fields, ",")); + } catch (IllegalArgumentException e) { + throw new BadRequestException( + "requested invalid field."); + } catch (IOException e) { + LOG.error("Error getting entity", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + if (entity == null) { + throw new WebApplicationException(Response.Status.NOT_FOUND); + } + return entity; + } + + /** + * Return the events that match the given parameters. + */ + @GET + @Path("/{entityType}/events") + @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelineEvents getEvents( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + @PathParam("entityType") String entityType, + @QueryParam("entityId") String entityId, + @QueryParam("eventType") String eventType, + @QueryParam("windowStart") String windowStart, + @QueryParam("windowEnd") String windowEnd, + @QueryParam("limit") String limit) { + init(res); + TimelineEvents events = null; + try { + events = store.getEntityTimelines( + parseStr(entityType), + parseArrayStr(entityId, ","), + parseLongStr(limit), + parseLongStr(windowStart), + parseLongStr(windowEnd), + parseArrayStr(eventType, ",")); + } catch (NumberFormatException e) { + throw new BadRequestException( + "windowStart, windowEnd or limit is not a numeric value."); + } catch (IOException e) { + LOG.error("Error getting entity timelines", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + if (events == null) { + return new TimelineEvents(); + } + return events; + } + + /** + * Store the given entities into the timeline store, and return the errors + * that happen during storing. + */ + @POST + @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */}) + public TimelinePutResponse postEntities( + @Context HttpServletRequest req, + @Context HttpServletResponse res, + TimelineEntities entities) { + init(res); + if (entities == null) { + return new TimelinePutResponse(); + } + try { + return store.put(entities); + } catch (IOException e) { + LOG.error("Error putting entities", e); + throw new WebApplicationException(e, + Response.Status.INTERNAL_SERVER_ERROR); + } + } + + private void init(HttpServletResponse response) { + response.setContentType(null); + } + + private static SortedSet parseArrayStr(String str, String delimiter) { + if (str == null) { + return null; + } + SortedSet strSet = new TreeSet(); + String[] strs = str.split(delimiter); + for (String aStr : strs) { + strSet.add(aStr.trim()); + } + return strSet; + } + + private static NameValuePair parsePairStr(String str, String delimiter) { + if (str == null) { + return null; + } + String[] strs = str.split(delimiter, 2); + return new NameValuePair(strs[0].trim(), strs[1].trim()); + } + + private static Collection parsePairsStr( + String str, String aDelimiter, String pDelimiter) { + if (str == null) { + return null; + } + String[] strs = str.split(aDelimiter); + Set pairs = new HashSet(); + for (String aStr : strs) { + pairs.add(parsePairStr(aStr, pDelimiter)); + } + return pairs; + } + + private static EnumSet parseFieldsStr(String str, String delimiter) { + if (str == null) { + return null; + } + String[] strs = str.split(delimiter); + List fieldList = new ArrayList(); + for (String s : strs) { + s = s.trim().toUpperCase(); + if (s.equals("EVENTS")) + fieldList.add(Field.EVENTS); + else if (s.equals("LASTEVENTONLY")) + fieldList.add(Field.LAST_EVENT_ONLY); + else if (s.equals("RELATEDENTITIES")) + fieldList.add(Field.RELATED_ENTITIES); + else if (s.equals("PRIMARYFILTERS")) + fieldList.add(Field.PRIMARY_FILTERS); + else if (s.equals("OTHERINFO")) + fieldList.add(Field.OTHER_INFO); + } + if (fieldList.size() == 0) + return null; + Field f1 = fieldList.remove(fieldList.size() - 1); + if (fieldList.size() == 0) + return EnumSet.of(f1); + else + return EnumSet.of(f1, fieldList.toArray(new Field[fieldList.size()])); + } + + private static Long parseLongStr(String str) { + return str == null ? null : Long.parseLong(str.trim()); + } + + private static String parseStr(String str) { + return str == null ? null : str.trim(); + } + +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestGenericObjectMapper.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.WritableComparator; +import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.GenericObjectMapper; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestGenericObjectMapper { + + @Test + public void testEncoding() { + testEncoding(Long.MAX_VALUE); + testEncoding(Long.MIN_VALUE); + testEncoding(0l); + testEncoding(128l); + testEncoding(256l); + testEncoding(512l); + testEncoding(-256l); + } + + private static void testEncoding(long l) { + byte[] b = GenericObjectMapper.writeReverseOrderedLong(l); + assertEquals("error decoding", l, + GenericObjectMapper.readReverseOrderedLong(b, 0)); + byte[] buf = new byte[16]; + System.arraycopy(b, 0, buf, 5, 8); + assertEquals("error decoding at offset", l, + GenericObjectMapper.readReverseOrderedLong(buf, 5)); + if (l > Long.MIN_VALUE) { + byte[] a = GenericObjectMapper.writeReverseOrderedLong(l-1); + assertEquals("error preserving ordering", 1, + WritableComparator.compareBytes(a, 0, a.length, b, 0, b.length)); + } + if (l < Long.MAX_VALUE) { + byte[] c = GenericObjectMapper.writeReverseOrderedLong(l+1); + assertEquals("error preserving ordering", 1, + WritableComparator.compareBytes(b, 0, b.length, c, 0, c.length)); + } + } + + private static void verify(Object o) throws IOException { + assertEquals(o, GenericObjectMapper.read(GenericObjectMapper.write(o))); + } + + @Test + public void testValueTypes() throws IOException { + verify(42l); + verify(42); + verify(1.23); + verify("abc"); + verify(true); + List list = new ArrayList(); + list.add("123"); + list.add("abc"); + verify(list); + Map map = new HashMap(); + map.put("k1","v1"); + map.put("k2","v2"); + verify(map); + } + +} Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java?rev=1570922&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java (added) +++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java Sat Feb 22 20:55:06 2014 @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.applicationhistoryservice.timeline; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestLeveldbTimelineStore + extends TimelineStoreTestUtils { + private FileContext fsContext; + private File fsPath; + + @Before + public void setup() throws Exception { + fsContext = FileContext.getLocalFSFileContext(); + Configuration conf = new Configuration(); + fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + fsPath.getAbsolutePath()); + store = new LeveldbTimelineStore(); + store.init(conf); + store.start(); + loadTestData(); + loadVerificationData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + } + + @Test + public void testGetSingleEntity() throws IOException { + super.testGetSingleEntity(); + ((LeveldbTimelineStore)store).clearStartTimeCache(); + super.testGetSingleEntity(); + } + + @Test + public void testGetEntities() throws IOException { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithPrimaryFilters() throws IOException { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() throws IOException { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() throws IOException { + super.testGetEvents(); + } + +}