hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [2/3] hadoop git commit: YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. Contributed by Jonathan Eagles.
Date Thu, 07 May 2015 17:09:32 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
new file mode 100644
index 0000000..8b6a51b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/RollingLevelDBTimelineStore.java
@@ -0,0 +1,1807 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
+import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
+import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
+import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
+
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBException;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.ReadOptions;
+import org.iq80.leveldb.WriteBatch;
+import org.nustaq.serialization.FSTConfiguration;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
+import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE;
+import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_TTL_MS;
+
+import static org.fusesource.leveldbjni.JniDBFactory.bytes;
+
+/**
+ * <p>
+ * An implementation of an application timeline store backed by leveldb.
+ * </p>
+ *
+ * <p>
+ * There are three sections of the db, the start time section, the entity
+ * section, and the indexed entity section.
+ * </p>
+ *
+ * <p>
+ * The start time section is used to retrieve the unique start time for a given
+ * entity. Its values each contain a start time while its keys are of the form:
+ * </p>
+ *
+ * <pre>
+ *   START_TIME_LOOKUP_PREFIX + entity type + entity id
+ * </pre>
+ *
+ * <p>
+ * The entity section is ordered by entity type, then entity start time
+ * descending, then entity ID. There are four sub-sections of the entity
+ * section: events, primary filters, related entities, and other info. The event
+ * entries have event info serialized into their values. The other info entries
+ * have values corresponding to the values of the other info name/value map for
+ * the entry (note the names are contained in the key). All other entries have
+ * empty values. The key structure is as follows:
+ * </p>
+ *
+ * <pre>
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     DOMAIN_ID_COLUMN
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     EVENTS_COLUMN + reveventtimestamp + eventtype
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     PRIMARY_FILTERS_COLUMN + name + value
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     OTHER_INFO_COLUMN + name
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
+ * </pre>
+ *
+ * <p>
+ * The indexed entity section contains a primary filter name and primary filter
+ * value as the prefix. Within a given name/value, entire entity entries are
+ * stored in the same format as described in the entity section above (below,
+ * "key" represents any one of the possible entity entry keys described above).
+ * </p>
+ *
+ * <pre>
+ *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
+ *     key
+ * </pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RollingLevelDBTimelineStore extends AbstractService implements
+    TimelineStore {
+  private static final Log LOG = LogFactory
+      .getLog(RollingLevelDBTimelineStore.class);
+  private static FSTConfiguration fstConf =
+      FSTConfiguration.createDefaultConfiguration();
+
+  static {
+    fstConf.setShareReferences(false);
+  }
+
+  @Private
+  @VisibleForTesting
+  static final String FILENAME = "leveldb-timeline-store";
+  static final String DOMAIN = "domain-ldb";
+  static final String ENTITY = "entity-ldb";
+  static final String INDEX = "indexes-ldb";
+  static final String STARTTIME = "starttime-ldb";
+  static final String OWNER = "owner-ldb";
+
+  private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
+  private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
+  private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
+  private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(UTF_8);
+  private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(UTF_8);
+
+  private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(UTF_8);
+  private static final byte[] OWNER_COLUMN = "o".getBytes(UTF_8);
+  private static final byte[] READER_COLUMN = "r".getBytes(UTF_8);
+  private static final byte[] WRITER_COLUMN = "w".getBytes(UTF_8);
+  private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(UTF_8);
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  private static final String TIMELINE_STORE_VERSION_KEY =
+      "timeline-store-version";
+
+  private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);
+
+  private static long writeBatchSize = 10000;
+
+  @Private
+  @VisibleForTesting
+  static final FsPermission LEVELDB_DIR_UMASK = FsPermission
+      .createImmutable((short) 0700);
+
+  private Map<EntityIdentifier, Long> startTimeWriteCache;
+  private Map<EntityIdentifier, Long> startTimeReadCache;
+
+  private DB domaindb;
+  private RollingLevelDB entitydb;
+  private RollingLevelDB indexdb;
+  private DB starttimedb;
+  private DB ownerdb;
+
+  private Thread deletionThread;
+
+  public RollingLevelDBTimelineStore() {
+    super(RollingLevelDBTimelineStore.class.getName());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void serviceInit(Configuration conf) throws Exception {
+    Preconditions
+        .checkArgument(conf.getLong(TIMELINE_SERVICE_TTL_MS,
+            DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0,
+            "%s property value should be greater than zero",
+            TIMELINE_SERVICE_TTL_MS);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0,
+        "%s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0,
+        "%s property value should be greater than or equal to zero",
+        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0,
+        " %s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0,
+        "%s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES) > 0,
+        "%s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES);
+    Preconditions.checkArgument(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE) > 0,
+        "%s property value should be greater than zero",
+        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE);
+
+    Options options = new Options();
+    options.createIfMissing(true);
+    options.cacheSize(conf.getLong(
+        TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+    JniDBFactory factory = new JniDBFactory();
+    Path dbPath = new Path(
+        conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
+    Path domainDBPath = new Path(dbPath, DOMAIN);
+    Path starttimeDBPath = new Path(dbPath, STARTTIME);
+    Path ownerDBPath = new Path(dbPath, OWNER);
+    FileSystem localFS = null;
+    try {
+      localFS = FileSystem.getLocal(conf);
+      if (!localFS.exists(dbPath)) {
+        if (!localFS.mkdirs(dbPath)) {
+          throw new IOException("Couldn't create directory for leveldb "
+              + "timeline store " + dbPath);
+        }
+        localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
+      }
+      if (!localFS.exists(domainDBPath)) {
+        if (!localFS.mkdirs(domainDBPath)) {
+          throw new IOException("Couldn't create directory for leveldb "
+              + "timeline store " + domainDBPath);
+        }
+        localFS.setPermission(domainDBPath, LEVELDB_DIR_UMASK);
+      }
+      if (!localFS.exists(starttimeDBPath)) {
+        if (!localFS.mkdirs(starttimeDBPath)) {
+          throw new IOException("Couldn't create directory for leveldb "
+              + "timeline store " + starttimeDBPath);
+        }
+        localFS.setPermission(starttimeDBPath, LEVELDB_DIR_UMASK);
+      }
+      if (!localFS.exists(ownerDBPath)) {
+        if (!localFS.mkdirs(ownerDBPath)) {
+          throw new IOException("Couldn't create directory for leveldb "
+              + "timeline store " + ownerDBPath);
+        }
+        localFS.setPermission(ownerDBPath, LEVELDB_DIR_UMASK);
+      }
+    } finally {
+      IOUtils.cleanup(LOG, localFS);
+    }
+    options.maxOpenFiles(conf.getInt(
+        TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_MAX_OPEN_FILES));
+    options.writeBufferSize(conf.getInt(
+        TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
+    LOG.info("Using leveldb path " + dbPath);
+    domaindb = factory.open(new File(domainDBPath.toString()), options);
+    entitydb = new RollingLevelDB(ENTITY);
+    entitydb.init(conf);
+    indexdb = new RollingLevelDB(INDEX);
+    indexdb.init(conf);
+    starttimedb = factory.open(new File(starttimeDBPath.toString()), options);
+    ownerdb = factory.open(new File(ownerDBPath.toString()), options);
+    checkVersion();
+    startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
+        getStartTimeWriteCacheSize(conf)));
+    startTimeReadCache = Collections.synchronizedMap(new LRUMap(
+        getStartTimeReadCacheSize(conf)));
+
+    writeBatchSize = conf.getInt(
+        TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE,
+        DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BATCH_SIZE);
+
+    super.serviceInit(conf);
+  }
+  
+  @Override
+  protected void serviceStart() throws Exception {
+    if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
+      deletionThread = new EntityDeletionThread(getConfig());
+      deletionThread.start();
+    }
+    super.serviceStart();
+   }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (deletionThread != null) {
+      deletionThread.interrupt();
+      LOG.info("Waiting for deletion thread to complete its current action");
+      try {
+        deletionThread.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for deletion thread to complete,"
+            + " closing db now", e);
+      }
+    }
+    IOUtils.cleanup(LOG, domaindb);
+    IOUtils.cleanup(LOG, starttimedb);
+    IOUtils.cleanup(LOG, ownerdb);
+    entitydb.stop();
+    indexdb.stop();
+    super.serviceStop();
+  }
+
+  private class EntityDeletionThread extends Thread {
+    private final long ttl;
+    private final long ttlInterval;
+
+    public EntityDeletionThread(Configuration conf) {
+      ttl = conf.getLong(TIMELINE_SERVICE_TTL_MS,
+          DEFAULT_TIMELINE_SERVICE_TTL_MS);
+      ttlInterval = conf.getLong(
+          TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+          DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+      LOG.info("Starting deletion thread with ttl " + ttl + " and cycle "
+          + "interval " + ttlInterval);
+    }
+
+    @Override
+    public void run() {
+      Thread.currentThread().setName("Leveldb Timeline Store Retention");
+      while (true) {
+        long timestamp = System.currentTimeMillis() - ttl;
+        try {
+          discardOldEntities(timestamp);
+          Thread.sleep(ttlInterval);
+        } catch (IOException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.info("Deletion thread received interrupt, exiting");
+          break;
+        }
+      }
+    }
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fields) throws IOException {
+    Long revStartTime = getStartTimeLong(entityId, entityType);
+    if (revStartTime == null) {
+      return null;
+    }
+    byte[] prefix = KeyBuilder.newInstance().add(entityType)
+        .add(writeReverseOrderedLong(revStartTime)).add(entityId)
+        .getBytesForLookup();
+
+    DBIterator iterator = null;
+    try {
+      DB db = entitydb.getDBForStartTime(revStartTime);
+      if (db == null) {
+        return null;
+      }
+      iterator = db.iterator();
+      iterator.seek(prefix);
+
+      return getEntity(entityId, entityType, revStartTime, fields, iterator,
+          prefix, prefix.length);
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Read entity from a db iterator. If no information is found in the specified
+   * fields for this entity, return null.
+   */
+  private static TimelineEntity getEntity(String entityId, String entityType,
+      Long startTime, EnumSet<Field> fields, DBIterator iterator,
+      byte[] prefix, int prefixlen) throws IOException {
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
+
+    TimelineEntity entity = new TimelineEntity();
+    boolean events = false;
+    boolean lastEvent = false;
+    if (fields.contains(Field.EVENTS)) {
+      events = true;
+    } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+      lastEvent = true;
+    } else {
+      entity.setEvents(null);
+    }
+    boolean relatedEntities = false;
+    if (fields.contains(Field.RELATED_ENTITIES)) {
+      relatedEntities = true;
+    } else {
+      entity.setRelatedEntities(null);
+    }
+    boolean primaryFilters = false;
+    if (fields.contains(Field.PRIMARY_FILTERS)) {
+      primaryFilters = true;
+    } else {
+      entity.setPrimaryFilters(null);
+    }
+    boolean otherInfo = false;
+    if (fields.contains(Field.OTHER_INFO)) {
+      otherInfo = true;
+    } else {
+      entity.setOtherInfo(null);
+    }
+
+    // iterate through the entity's entry, parsing information if it is part
+    // of a requested field
+    for (; iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefixlen, key)) {
+        break;
+      }
+      if (key.length == prefixlen) {
+        continue;
+      }
+      if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
+        if (primaryFilters) {
+          addPrimaryFilter(entity, key, prefixlen
+              + PRIMARY_FILTERS_COLUMN.length);
+        }
+      } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
+        if (otherInfo) {
+          entity.addOtherInfo(
+              parseRemainingKey(key, prefixlen + OTHER_INFO_COLUMN.length),
+              fstConf.asObject(iterator.peekNext().getValue()));
+        }
+      } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
+        if (relatedEntities) {
+          addRelatedEntity(entity, key, prefixlen
+              + RELATED_ENTITIES_COLUMN.length);
+        }
+      } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
+        if (events || (lastEvent && entity.getEvents().size() == 0)) {
+          TimelineEvent event = getEntityEvent(null, key, prefixlen
+              + EVENTS_COLUMN.length, iterator.peekNext().getValue());
+          if (event != null) {
+            entity.addEvent(event);
+          }
+        }
+      } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) {
+        byte[] v = iterator.peekNext().getValue();
+        String domainId = new String(v, UTF_8);
+        entity.setDomainId(domainId);
+      } else {
+        LOG.warn(String.format("Found unexpected column for entity %s of "
+            + "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+      }
+    }
+
+    entity.setEntityId(entityId);
+    entity.setEntityType(entityType);
+    entity.setStartTime(startTime);
+
+    return entity;
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventType) throws IOException {
+    TimelineEvents events = new TimelineEvents();
+    if (entityIds == null || entityIds.isEmpty()) {
+      return events;
+    }
+    // create a lexicographically-ordered map from start time to entities
+    Map<byte[], List<EntityIdentifier>> startTimeMap =
+        new TreeMap<byte[], List<EntityIdentifier>>(
+        new Comparator<byte[]>() {
+          @Override
+          public int compare(byte[] o1, byte[] o2) {
+            return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
+                o2.length);
+          }
+        });
+    DBIterator iterator = null;
+    try {
+      // look up start times for the specified entities
+      // skip entities with no start time
+      for (String entityId : entityIds) {
+        byte[] startTime = getStartTime(entityId, entityType);
+        if (startTime != null) {
+          List<EntityIdentifier> entities = startTimeMap.get(startTime);
+          if (entities == null) {
+            entities = new ArrayList<EntityIdentifier>();
+            startTimeMap.put(startTime, entities);
+          }
+          entities.add(new EntityIdentifier(entityId, entityType));
+        }
+      }
+      for (Entry<byte[], List<EntityIdentifier>> entry : startTimeMap
+          .entrySet()) {
+        // look up the events matching the given parameters (limit,
+        // start time, end time, event types) for entities whose start times
+        // were found and add the entities to the return list
+        byte[] revStartTime = entry.getKey();
+        for (EntityIdentifier entityIdentifier : entry.getValue()) {
+          EventsOfOneEntity entity = new EventsOfOneEntity();
+          entity.setEntityId(entityIdentifier.getId());
+          entity.setEntityType(entityType);
+          events.addEvent(entity);
+          KeyBuilder kb = KeyBuilder.newInstance().add(entityType)
+              .add(revStartTime).add(entityIdentifier.getId())
+              .add(EVENTS_COLUMN);
+          byte[] prefix = kb.getBytesForLookup();
+          if (windowEnd == null) {
+            windowEnd = Long.MAX_VALUE;
+          }
+          byte[] revts = writeReverseOrderedLong(windowEnd);
+          kb.add(revts);
+          byte[] first = kb.getBytesForLookup();
+          byte[] last = null;
+          if (windowStart != null) {
+            last = KeyBuilder.newInstance().add(prefix)
+                .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
+          }
+          if (limit == null) {
+            limit = DEFAULT_LIMIT;
+          }
+          DB db = entitydb.getDBForStartTime(readReverseOrderedLong(
+              revStartTime, 0));
+          if (db == null) {
+            continue;
+          }
+          iterator = db.iterator();
+          for (iterator.seek(first); entity.getEvents().size() < limit
+              && iterator.hasNext(); iterator.next()) {
+            byte[] key = iterator.peekNext().getKey();
+            if (!prefixMatches(prefix, prefix.length, key)
+                || (last != null && WritableComparator.compareBytes(key, 0,
+                    key.length, last, 0, last.length) > 0)) {
+              break;
+            }
+            TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
+                iterator.peekNext().getValue());
+            if (event != null) {
+              entity.addEvent(event);
+            }
+          }
+        }
+      }
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+    return events;
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType, Long limit,
+      Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
+    if (primaryFilter == null) {
+      // if no primary filter is specified, prefix the lookup with
+      // ENTITY_ENTRY_PREFIX
+      return getEntityByTime(EMPTY_BYTES, entityType, limit, windowStart,
+          windowEnd, fromId, fromTs, secondaryFilters, fields, checkAcl, false);
+    } else {
+      // if a primary filter is specified, prefix the lookup with
+      // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
+      // ENTITY_ENTRY_PREFIX
+      byte[] base = KeyBuilder.newInstance().add(primaryFilter.getName())
+          .add(fstConf.asByteArray(primaryFilter.getValue()), true)
+          .getBytesForLookup();
+      return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
+          fromId, fromTs, secondaryFilters, fields, checkAcl, true);
+    }
+  }
+
+  /**
+   * Retrieves a list of entities satisfying given parameters.
+   *
+   * @param base
+   *          A byte array prefix for the lookup
+   * @param entityType
+   *          The type of the entity
+   * @param limit
+   *          A limit on the number of entities to return
+   * @param starttime
+   *          The earliest entity start time to retrieve (exclusive)
+   * @param endtime
+   *          The latest entity start time to retrieve (inclusive)
+   * @param fromId
+   *          Retrieve entities starting with this entity
+   * @param fromTs
+   *          Ignore entities with insert timestamp later than this ts
+   * @param secondaryFilters
+   *          Filter pairs that the entities should match
+   * @param fields
+   *          The set of fields to retrieve
+   * @param usingPrimaryFilter
+   *          true if this query is using a primary filter
+   * @return A list of entities
+   * @throws IOException
+   */
+  private TimelineEntities getEntityByTime(byte[] base, String entityType,
+      Long limit, Long starttime, Long endtime, String fromId, Long fromTs,
+      Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields,
+      CheckAcl checkAcl, boolean usingPrimaryFilter) throws IOException {
+    DBIterator iterator = null;
+    try {
+      KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
+      // only db keys matching the prefix (base + entity type) will be parsed
+      byte[] prefix = kb.getBytesForLookup();
+      if (endtime == null) {
+        // if end time is null, place no restriction on end time
+        endtime = Long.MAX_VALUE;
+      }
+
+      // Sanitize the fields parameter
+      if (fields == null) {
+        fields = EnumSet.allOf(Field.class);
+      }
+
+      // construct a first key that will be seeked to using end time or fromId
+      long firstStartTime = Long.MAX_VALUE;
+      byte[] first = null;
+      if (fromId != null) {
+        Long fromIdStartTime = getStartTimeLong(fromId, entityType);
+        if (fromIdStartTime == null) {
+          // no start time for provided id, so return empty entities
+          return new TimelineEntities();
+        }
+        if (fromIdStartTime <= endtime) {
+          // if provided id's start time falls before the end of the window,
+          // use it to construct the seek key
+          firstStartTime = fromIdStartTime;
+          first = kb.add(writeReverseOrderedLong(fromIdStartTime)).add(fromId)
+              .getBytesForLookup();
+        }
+      }
+      // if seek key wasn't constructed using fromId, construct it using end ts
+      if (first == null) {
+        firstStartTime = endtime;
+        first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
+      }
+      byte[] last = null;
+      if (starttime != null) {
+        // if start time is not null, set a last key that will not be
+        // iterated past
+        last = KeyBuilder.newInstance().add(base).add(entityType)
+            .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
+      }
+      if (limit == null) {
+        // if limit is not specified, use the default
+        limit = DEFAULT_LIMIT;
+      }
+
+      TimelineEntities entities = new TimelineEntities();
+      RollingLevelDB rollingdb = null;
+      if (usingPrimaryFilter) {
+        rollingdb = indexdb;
+      } else {
+        rollingdb = entitydb;
+      }
+
+      DB db = rollingdb.getDBForStartTime(firstStartTime);
+      while (entities.getEntities().size() < limit && db != null) {
+        iterator = db.iterator();
+        iterator.seek(first);
+
+        // iterate until one of the following conditions is met: limit is
+        // reached, there are no more keys, the key prefix no longer matches,
+        // or a start time has been specified and reached/exceeded
+        while (entities.getEntities().size() < limit && iterator.hasNext()) {
+          byte[] key = iterator.peekNext().getKey();
+          if (!prefixMatches(prefix, prefix.length, key)
+              || (last != null && WritableComparator.compareBytes(key, 0,
+                  key.length, last, 0, last.length) > 0)) {
+            break;
+          }
+          // read the start time and entity id from the current key
+          KeyParser kp = new KeyParser(key, prefix.length);
+          Long startTime = kp.getNextLong();
+          String entityId = kp.getNextString();
+
+          if (fromTs != null) {
+            long insertTime = readReverseOrderedLong(iterator.peekNext()
+                .getValue(), 0);
+            if (insertTime > fromTs) {
+              byte[] firstKey = key;
+              while (iterator.hasNext()) {
+                key = iterator.peekNext().getKey();
+                iterator.next();
+                if (!prefixMatches(firstKey, kp.getOffset(), key)) {
+                  break;
+                }
+              }
+              continue;
+            }
+          }
+          // Even if other info and primary filter fields are not included, we
+          // still need to load them to match secondary filters when they are
+          // non-empty
+          EnumSet<Field> queryFields = EnumSet.copyOf(fields);
+          boolean addPrimaryFilters = false;
+          boolean addOtherInfo = false;
+          if (secondaryFilters != null && secondaryFilters.size() > 0) {
+            if (!queryFields.contains(Field.PRIMARY_FILTERS)) {
+              queryFields.add(Field.PRIMARY_FILTERS);
+              addPrimaryFilters = true;
+            }
+            if (!queryFields.contains(Field.OTHER_INFO)) {
+              queryFields.add(Field.OTHER_INFO);
+              addOtherInfo = true;
+            }
+          }
+
+          // parse the entity that owns this key, iterating over all keys for
+          // the entity
+          TimelineEntity entity = null;
+          if (usingPrimaryFilter) {
+            entity = getEntity(entityId, entityType, queryFields);
+            iterator.next();
+          } else {
+            entity = getEntity(entityId, entityType, startTime, queryFields,
+                iterator, key, kp.getOffset());
+          }
+          // determine if the retrieved entity matches the provided secondary
+          // filters, and if so add it to the list of entities to return
+          boolean filterPassed = true;
+          if (secondaryFilters != null) {
+            for (NameValuePair filter : secondaryFilters) {
+              Object v = entity.getOtherInfo().get(filter.getName());
+              if (v == null) {
+                Set<Object> vs = entity.getPrimaryFilters()
+                    .get(filter.getName());
+                if (vs == null || !vs.contains(filter.getValue())) {
+                  filterPassed = false;
+                  break;
+                }
+              } else if (!v.equals(filter.getValue())) {
+                filterPassed = false;
+                break;
+              }
+            }
+          }
+          if (filterPassed) {
+            if (entity.getDomainId() == null) {
+              entity.setDomainId(DEFAULT_DOMAIN_ID);
+            }
+            if (checkAcl == null || checkAcl.check(entity)) {
+              // Remove primary filter and other info if they are added for
+              // matching secondary filters
+              if (addPrimaryFilters) {
+                entity.setPrimaryFilters(null);
+              }
+              if (addOtherInfo) {
+                entity.setOtherInfo(null);
+              }
+              entities.addEntity(entity);
+            }
+          }
+        }
+        db = rollingdb.getPreviousDB(db);
+      }
+      return entities;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Put a single entity. If there is an error, add a TimelinePutError to the
+   * given response.
+   *
+   * @param entityUpdates
+   *          a map containing all the scheduled writes for this put to the
+   *          entity db
+   * @param indexUpdates
+   *          a map containing all the scheduled writes for this put to the
+   *          index db
+   */
+  private long putEntities(TreeMap<Long, RollingWriteBatch> entityUpdates,
+      TreeMap<Long, RollingWriteBatch> indexUpdates, TimelineEntity entity,
+      TimelinePutResponse response) {
+
+    long putCount = 0;
+    List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
+        new ArrayList<EntityIdentifier>();
+    byte[] revStartTime = null;
+    Map<String, Set<Object>> primaryFilters = null;
+    try {
+      List<TimelineEvent> events = entity.getEvents();
+      // look up the start time for the entity
+      Long startTime = getAndSetStartTime(entity.getEntityId(),
+          entity.getEntityType(), entity.getStartTime(), events);
+      if (startTime == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.NO_START_TIME);
+        response.addError(error);
+        return putCount;
+      }
+
+      // Must have a domain
+      if (StringUtils.isEmpty(entity.getDomainId())) {
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.NO_DOMAIN);
+        response.addError(error);
+        return putCount;
+      }
+
+      revStartTime = writeReverseOrderedLong(startTime);
+      long roundedStartTime = entitydb.computeCurrentCheckMillis(startTime);
+      RollingWriteBatch rollingWriteBatch = entityUpdates.get(roundedStartTime);
+      if (rollingWriteBatch == null) {
+        DB db = entitydb.getDBForStartTime(startTime);
+        if (db != null) {
+          WriteBatch writeBatch = db.createWriteBatch();
+          rollingWriteBatch = new RollingWriteBatch(db, writeBatch);
+          entityUpdates.put(roundedStartTime, rollingWriteBatch);
+        }
+      }
+      if (rollingWriteBatch == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
+        response.addError(error);
+        return putCount;
+      }
+      WriteBatch writeBatch = rollingWriteBatch.getWriteBatch();
+
+      // Save off the getBytes conversion to avoid unnecessary cost
+      byte[] entityIdBytes = entity.getEntityId().getBytes(UTF_8);
+      byte[] entityTypeBytes = entity.getEntityType().getBytes(UTF_8);
+      byte[] domainIdBytes = entity.getDomainId().getBytes(UTF_8);
+
+      // write entity marker
+      byte[] markerKey = KeyBuilder.newInstance(3).add(entityTypeBytes, true)
+          .add(revStartTime).add(entityIdBytes, true).getBytesForLookup();
+      writeBatch.put(markerKey, EMPTY_BYTES);
+      ++putCount;
+
+      // write domain id entry
+      byte[] domainkey = KeyBuilder.newInstance(4).add(entityTypeBytes, true)
+          .add(revStartTime).add(entityIdBytes, true).add(DOMAIN_ID_COLUMN)
+          .getBytes();
+      writeBatch.put(domainkey, domainIdBytes);
+      ++putCount;
+
+      // write event entries
+      if (events != null) {
+        for (TimelineEvent event : events) {
+          byte[] revts = writeReverseOrderedLong(event.getTimestamp());
+          byte[] key = KeyBuilder.newInstance().add(entityTypeBytes, true)
+              .add(revStartTime).add(entityIdBytes, true).add(EVENTS_COLUMN)
+              .add(revts).add(event.getEventType().getBytes(UTF_8)).getBytes();
+          byte[] value = fstConf.asByteArray(event.getEventInfo());
+          writeBatch.put(key, value);
+          ++putCount;
+        }
+      }
+
+      // write primary filter entries
+      primaryFilters = entity.getPrimaryFilters();
+      if (primaryFilters != null) {
+        for (Entry<String, Set<Object>> primaryFilter : primaryFilters
+            .entrySet()) {
+          for (Object primaryFilterValue : primaryFilter.getValue()) {
+            byte[] key = KeyBuilder.newInstance(6).add(entityTypeBytes, true)
+                .add(revStartTime).add(entityIdBytes, true)
+                .add(PRIMARY_FILTERS_COLUMN).add(primaryFilter.getKey())
+                .add(fstConf.asByteArray(primaryFilterValue)).getBytes();
+            writeBatch.put(key, EMPTY_BYTES);
+            ++putCount;
+          }
+        }
+      }
+
+      // write other info entries
+      Map<String, Object> otherInfo = entity.getOtherInfo();
+      if (otherInfo != null) {
+        for (Entry<String, Object> info : otherInfo.entrySet()) {
+          byte[] key = KeyBuilder.newInstance(5).add(entityTypeBytes, true)
+              .add(revStartTime).add(entityIdBytes, true)
+              .add(OTHER_INFO_COLUMN).add(info.getKey()).getBytes();
+          byte[] value = fstConf.asByteArray(info.getValue());
+          writeBatch.put(key, value);
+          ++putCount;
+        }
+      }
+
+      // write related entity entries
+      Map<String, Set<String>> relatedEntities = entity.getRelatedEntities();
+      if (relatedEntities != null) {
+        for (Entry<String, Set<String>> relatedEntityList : relatedEntities
+            .entrySet()) {
+          String relatedEntityType = relatedEntityList.getKey();
+          for (String relatedEntityId : relatedEntityList.getValue()) {
+            // look up start time of related entity
+            Long relatedStartTimeLong = getStartTimeLong(relatedEntityId,
+                relatedEntityType);
+            // delay writing the related entity if no start time is found
+            if (relatedStartTimeLong == null) {
+              relatedEntitiesWithoutStartTimes.add(new EntityIdentifier(
+                  relatedEntityId, relatedEntityType));
+              continue;
+            }
+
+            byte[] relatedEntityStartTime =
+                writeReverseOrderedLong(relatedStartTimeLong);
+            long relatedRoundedStartTime = entitydb
+                .computeCurrentCheckMillis(relatedStartTimeLong);
+            RollingWriteBatch relatedRollingWriteBatch = entityUpdates
+                .get(relatedRoundedStartTime);
+            if (relatedRollingWriteBatch == null) {
+              DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
+              if (db != null) {
+                WriteBatch relatedWriteBatch = db.createWriteBatch();
+                relatedRollingWriteBatch = new RollingWriteBatch(db,
+                    relatedWriteBatch);
+                entityUpdates.put(relatedRoundedStartTime,
+                    relatedRollingWriteBatch);
+              }
+            }
+            if (relatedRollingWriteBatch == null) {
+              // if no start time is found, add an error and return
+              TimelinePutError error = new TimelinePutError();
+              error.setEntityId(entity.getEntityId());
+              error.setEntityType(entity.getEntityType());
+              error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
+              response.addError(error);
+              continue;
+            }
+            // This is the existing entity
+            byte[] relatedDomainIdBytes = relatedRollingWriteBatch.getDB().get(
+                createDomainIdKey(relatedEntityId, relatedEntityType,
+                    relatedEntityStartTime));
+            // The timeline data created by the server before 2.6 won't have
+            // the domain field. We assume this timeline data is in the
+            // default timeline domain.
+            String domainId = null;
+            if (relatedDomainIdBytes == null) {
+              domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
+            } else {
+              domainId = new String(relatedDomainIdBytes, UTF_8);
+            }
+            if (!domainId.equals(entity.getDomainId())) {
+              // in this case the entity will be put, but the relation will be
+              // ignored
+              TimelinePutError error = new TimelinePutError();
+              error.setEntityId(entity.getEntityId());
+              error.setEntityType(entity.getEntityType());
+              error.setErrorCode(TimelinePutError.FORBIDDEN_RELATION);
+              response.addError(error);
+              continue;
+            }
+            // write "forward" entry (related entity -> entity)
+            byte[] key = createRelatedEntityKey(relatedEntityId,
+                relatedEntityType, relatedEntityStartTime,
+                entity.getEntityId(), entity.getEntityType());
+            WriteBatch relatedWriteBatch = relatedRollingWriteBatch
+                .getWriteBatch();
+            relatedWriteBatch.put(key, EMPTY_BYTES);
+            ++putCount;
+          }
+        }
+      }
+
+      // write index entities
+      RollingWriteBatch indexRollingWriteBatch = indexUpdates
+          .get(roundedStartTime);
+      if (indexRollingWriteBatch == null) {
+        DB db = indexdb.getDBForStartTime(startTime);
+        if (db != null) {
+          WriteBatch indexWriteBatch = db.createWriteBatch();
+          indexRollingWriteBatch = new RollingWriteBatch(db, indexWriteBatch);
+          indexUpdates.put(roundedStartTime, indexRollingWriteBatch);
+        }
+      }
+      if (indexRollingWriteBatch == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
+        response.addError(error);
+        return putCount;
+      }
+      WriteBatch indexWriteBatch = indexRollingWriteBatch.getWriteBatch();
+      putCount += writePrimaryFilterEntries(indexWriteBatch, primaryFilters,
+          markerKey, EMPTY_BYTES);
+    } catch (IOException e) {
+      LOG.error("Error putting entity " + entity.getEntityId() + " of type "
+          + entity.getEntityType(), e);
+      TimelinePutError error = new TimelinePutError();
+      error.setEntityId(entity.getEntityId());
+      error.setEntityType(entity.getEntityType());
+      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+      response.addError(error);
+    }
+
+    for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
+      try {
+        Long relatedEntityStartAndInsertTime = getAndSetStartTime(
+            relatedEntity.getId(), relatedEntity.getType(),
+            readReverseOrderedLong(revStartTime, 0), null);
+        if (relatedEntityStartAndInsertTime == null) {
+          throw new IOException("Error setting start time for related entity");
+        }
+        long relatedStartTimeLong = relatedEntityStartAndInsertTime;
+        long relatedRoundedStartTime = entitydb
+            .computeCurrentCheckMillis(relatedStartTimeLong);
+        RollingWriteBatch relatedRollingWriteBatch = entityUpdates
+            .get(relatedRoundedStartTime);
+        if (relatedRollingWriteBatch == null) {
+          DB db = entitydb.getDBForStartTime(relatedStartTimeLong);
+          if (db != null) {
+            WriteBatch relatedWriteBatch = db.createWriteBatch();
+            relatedRollingWriteBatch = new RollingWriteBatch(db,
+                relatedWriteBatch);
+            entityUpdates
+                .put(relatedRoundedStartTime, relatedRollingWriteBatch);
+          }
+        }
+        if (relatedRollingWriteBatch == null) {
+          // if no start time is found, add an error and return
+          TimelinePutError error = new TimelinePutError();
+          error.setEntityId(entity.getEntityId());
+          error.setEntityType(entity.getEntityType());
+          error.setErrorCode(TimelinePutError.EXPIRED_ENTITY);
+          response.addError(error);
+          continue;
+        }
+        WriteBatch relatedWriteBatch = relatedRollingWriteBatch.getWriteBatch();
+        byte[] relatedEntityStartTime =
+            writeReverseOrderedLong(relatedEntityStartAndInsertTime);
+        // This is the new entity, the domain should be the same
+        byte[] key = createDomainIdKey(relatedEntity.getId(),
+            relatedEntity.getType(), relatedEntityStartTime);
+        relatedWriteBatch.put(key, entity.getDomainId().getBytes(UTF_8));
+        ++putCount;
+        relatedWriteBatch.put(
+            createRelatedEntityKey(relatedEntity.getId(),
+                relatedEntity.getType(), relatedEntityStartTime,
+                entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
+        ++putCount;
+        relatedWriteBatch.put(
+            createEntityMarkerKey(relatedEntity.getId(),
+                relatedEntity.getType(), relatedEntityStartTime), EMPTY_BYTES);
+        ++putCount;
+      } catch (IOException e) {
+        LOG.error(
+            "Error putting related entity " + relatedEntity.getId()
+                + " of type " + relatedEntity.getType() + " for entity "
+                + entity.getEntityId() + " of type " + entity.getEntityType(),
+            e);
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+        response.addError(error);
+      }
+    }
+
+    return putCount;
+  }
+
+  /**
+   * For a given key / value pair that has been written to the db, write
+   * additional entries to the db for each primary filter.
+   */
+  private static long writePrimaryFilterEntries(WriteBatch writeBatch,
+      Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
+      throws IOException {
+    long putCount = 0;
+    if (primaryFilters != null) {
+      for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
+        for (Object pfval : pf.getValue()) {
+          writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, key), value);
+          ++putCount;
+        }
+      }
+    }
+    return putCount;
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities entities) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Starting put");
+    }
+    TimelinePutResponse response = new TimelinePutResponse();
+    TreeMap<Long, RollingWriteBatch> entityUpdates =
+        new TreeMap<Long, RollingWriteBatch>();
+    TreeMap<Long, RollingWriteBatch> indexUpdates =
+        new TreeMap<Long, RollingWriteBatch>();
+
+    long entityCount = 0;
+    long indexCount = 0;
+
+    try {
+
+      for (TimelineEntity entity : entities.getEntities()) {
+        entityCount += putEntities(entityUpdates, indexUpdates, entity,
+            response);
+      }
+
+      for (RollingWriteBatch entityUpdate : entityUpdates.values()) {
+        entityUpdate.write();
+      }
+
+      for (RollingWriteBatch indexUpdate : indexUpdates.values()) {
+        indexUpdate.write();
+      }
+
+    } finally {
+
+      for (RollingWriteBatch entityRollingWriteBatch : entityUpdates.values()) {
+        entityRollingWriteBatch.close();
+      }
+      for (RollingWriteBatch indexRollingWriteBatch : indexUpdates.values()) {
+        indexRollingWriteBatch.close();
+      }
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Put " + entityCount + " new leveldb entity entries and "
+          + indexCount + " new leveldb index entries from "
+          + entities.getEntities().size() + " timeline entities");
+    }
+    return response;
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts the
+   * timestamps in reverse order (see
+   * {@link GenericObjectMapper#writeReverseOrderedLong(long)}).
+   *
+   * @param entityId
+   *          The id of the entity
+   * @param entityType
+   *          The type of the entity
+   * @return A byte array, null if not found
+   * @throws IOException
+   */
+  private byte[] getStartTime(String entityId, String entityType)
+      throws IOException {
+    Long l = getStartTimeLong(entityId, entityType);
+    return l == null ? null : writeReverseOrderedLong(l);
+  }
+
+  /**
+   * Get the unique start time for a given entity as a Long.
+   *
+   * @param entityId
+   *          The id of the entity
+   * @param entityType
+   *          The type of the entity
+   * @return A Long, null if not found
+   * @throws IOException
+   */
+  private Long getStartTimeLong(String entityId, String entityType)
+      throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    // start time is not provided, so try to look it up
+    if (startTimeReadCache.containsKey(entity)) {
+      // found the start time in the cache
+      return startTimeReadCache.get(entity);
+    } else {
+      // try to look up the start time in the db
+      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+      byte[] v = starttimedb.get(b);
+      if (v == null) {
+        // did not find the start time in the db
+        return null;
+      } else {
+        // found the start time in the db
+        Long l = readReverseOrderedLong(v, 0);
+        startTimeReadCache.put(entity, l);
+        return l;
+      }
+    }
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts the
+   * timestamps in reverse order (see
+   * {@link GenericObjectMapper#writeReverseOrderedLong(long)}). If the start
+   * time doesn't exist, set it based on the information provided. Should only
+   * be called when a lock has been obtained on the entity.
+   *
+   * @param entityId
+   *          The id of the entity
+   * @param entityType
+   *          The type of the entity
+   * @param startTime
+   *          The start time of the entity, or null
+   * @param events
+   *          A list of events for the entity, or null
+   * @return A StartAndInsertTime
+   * @throws IOException
+   */
+  private Long getAndSetStartTime(String entityId, String entityType,
+      Long startTime, List<TimelineEvent> events) throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    Long time = startTimeWriteCache.get(entity);
+    if (time != null) {
+      // return the value in the cache
+      return time;
+    }
+    if (startTime == null && events != null) {
+      // calculate best guess start time based on lowest event time
+      startTime = Long.MAX_VALUE;
+      for (TimelineEvent e : events) {
+        if (e.getTimestamp() < startTime) {
+          startTime = e.getTimestamp();
+        }
+      }
+    }
+    // check the provided start time matches the db
+    return checkStartTimeInDb(entity, startTime);
+  }
+
+  /**
+   * Checks db for start time and returns it if it exists. If it doesn't exist,
+   * writes the suggested start time (if it is not null). This is only called
+   * when the start time is not found in the cache, so it adds it back into the
+   * cache if it is found. Should only be called when a lock has been obtained
+   * on the entity.
+   */
+  private Long checkStartTimeInDb(EntityIdentifier entity,
+      Long suggestedStartTime) throws IOException {
+    Long startAndInsertTime = null;
+    // create lookup key for start time
+    byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+    // retrieve value for key
+    byte[] v = starttimedb.get(b);
+    if (v == null) {
+      // start time doesn't exist in db
+      if (suggestedStartTime == null) {
+        return null;
+      }
+      startAndInsertTime = suggestedStartTime;
+
+      // write suggested start time
+      starttimedb.put(b, writeReverseOrderedLong(suggestedStartTime));
+    } else {
+      // found start time in db, so ignore suggested start time
+      startAndInsertTime = readReverseOrderedLong(v, 0);
+    }
+    startTimeWriteCache.put(entity, startAndInsertTime);
+    startTimeReadCache.put(entity, startAndInsertTime);
+    return startAndInsertTime;
+  }
+
+  /**
+   * Creates a key for looking up the start time of a given entity, of the form
+   * START_TIME_LOOKUP_PREFIX + entity type + entity id.
+   */
+  private static byte[] createStartTimeLookupKey(String entityId,
+      String entityType) throws IOException {
+    return KeyBuilder.newInstance().add(entityType).add(entityId).getBytes();
+  }
+
+  /**
+   * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id.
+   */
+  private static byte[] createEntityMarkerKey(String entityId,
+      String entityType, byte[] revStartTime) throws IOException {
+    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
+        .add(entityId).getBytesForLookup();
+  }
+
+  /**
+   * Creates an index entry for the given key of the form INDEXED_ENTRY_PREFIX +
+   * primaryfiltername + primaryfiltervalue + key.
+   */
+  private static byte[] addPrimaryFilterToKey(String primaryFilterName,
+      Object primaryFilterValue, byte[] key) throws IOException {
+    return KeyBuilder.newInstance().add(primaryFilterName)
+        .add(fstConf.asByteArray(primaryFilterValue), true).add(key).getBytes();
+  }
+
+  /**
+   * Creates an event object from the given key, offset, and value. If the event
+   * type is not contained in the specified set of event types, returns null.
+   */
+  private static TimelineEvent getEntityEvent(Set<String> eventTypes,
+      byte[] key, int offset, byte[] value) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    long ts = kp.getNextLong();
+    String tstype = kp.getNextString();
+    if (eventTypes == null || eventTypes.contains(tstype)) {
+      TimelineEvent event = new TimelineEvent();
+      event.setTimestamp(ts);
+      event.setEventType(tstype);
+      Object o = fstConf.asObject(value);
+      if (o == null) {
+        event.setEventInfo(null);
+      } else if (o instanceof Map) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> m = (Map<String, Object>) o;
+        event.setEventInfo(m);
+      } else {
+        throw new IOException("Couldn't deserialize event info map");
+      }
+      return event;
+    }
+    return null;
+  }
+
+  /**
+   * Parses the primary filter from the given key at the given offset and adds
+   * it to the given entity.
+   */
+  private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String name = kp.getNextString();
+    byte[] bytes = kp.getRemainingBytes();
+    Object value = fstConf.asObject(bytes);
+    entity.addPrimaryFilter(name, value);
+  }
+
+  /**
+   * Creates a string representation of the byte array from the given offset to
+   * the end of the array (for parsing other info keys).
+   */
+  private static String parseRemainingKey(byte[] b, int offset) {
+    return new String(b, offset, b.length - offset, UTF_8);
+  }
+
+  /**
+   * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + entity type
+   * + revstarttime + entity id + RELATED_ENTITIES_COLUMN + relatedentity type +
+   * relatedentity id.
+   */
+  private static byte[] createRelatedEntityKey(String entityId,
+      String entityType, byte[] revStartTime, String relatedEntityId,
+      String relatedEntityType) throws IOException {
+    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
+        .add(entityId).add(RELATED_ENTITIES_COLUMN).add(relatedEntityType)
+        .add(relatedEntityId).getBytes();
+  }
+
+  /**
+   * Parses the related entity from the given key at the given offset and adds
+   * it to the given entity.
+   */
+  private static void addRelatedEntity(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String type = kp.getNextString();
+    String id = kp.getNextString();
+    entity.addRelatedEntity(type, id);
+  }
+
+  /**
+   * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id + DOMAIN_ID_COLUMN.
+   */
+  private static byte[] createDomainIdKey(String entityId, String entityType,
+      byte[] revStartTime) throws IOException {
+    return KeyBuilder.newInstance().add(entityType).add(revStartTime)
+        .add(entityId).add(DOMAIN_ID_COLUMN).getBytes();
+  }
+
+  /**
+   * Clears the cache to test reloading start times from leveldb (only for
+   * testing).
+   */
+  @VisibleForTesting
+  void clearStartTimeCache() {
+    startTimeWriteCache.clear();
+    startTimeReadCache.clear();
+  }
+
+  @VisibleForTesting
+  static int getStartTimeReadCacheSize(Configuration conf) {
+    return conf
+        .getInt(
+            TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
+  }
+
+  @VisibleForTesting
+  static int getStartTimeWriteCacheSize(Configuration conf) {
+    return conf
+        .getInt(
+            TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
+  }
+
+  @VisibleForTesting
+  long evictOldStartTimes(long minStartTime) throws IOException {
+    LOG.info("Searching for start times to evict earlier than " + minStartTime);
+
+    long batchSize = 0;
+    long totalCount = 0;
+    long startTimesCount = 0;
+
+    WriteBatch writeBatch = null;
+    DBIterator iterator = null;
+
+    try {
+      writeBatch = starttimedb.createWriteBatch();
+      ReadOptions readOptions = new ReadOptions();
+      readOptions.fillCache(false);
+      iterator = starttimedb.iterator(readOptions);
+      // seek to the first start time entry
+      iterator.seekToFirst();
+
+      // evaluate each start time entry to see if it needs to be evicted or not
+      while (iterator.hasNext()) {
+        Map.Entry<byte[], byte[]> current = iterator.next();
+        byte[] entityKey = current.getKey();
+        byte[] entityValue = current.getValue();
+        long startTime = readReverseOrderedLong(entityValue, 0);
+        if (startTime < minStartTime) {
+          ++batchSize;
+          ++startTimesCount;
+          writeBatch.delete(entityKey);
+
+          // a large delete will hold the lock for too long
+          if (batchSize >= writeBatchSize) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Preparing to delete a batch of " + batchSize
+                  + " old start times");
+            }
+            starttimedb.write(writeBatch);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Deleted batch of " + batchSize
+                  + ". Total start times deleted so far this cycle: "
+                  + startTimesCount);
+            }
+            IOUtils.cleanup(LOG, writeBatch);
+            writeBatch = starttimedb.createWriteBatch();
+            batchSize = 0;
+          }
+        }
+        ++totalCount;
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Preparing to delete a batch of " + batchSize
+            + " old start times");
+      }
+      starttimedb.write(writeBatch);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleted batch of " + batchSize
+            + ". Total start times deleted so far this cycle: "
+            + startTimesCount);
+      }
+      LOG.info("Deleted " + startTimesCount + "/" + totalCount
+          + " start time entities earlier than " + minStartTime);
+    } finally {
+      IOUtils.cleanup(LOG, writeBatch);
+      IOUtils.cleanup(LOG, iterator);
+    }
+    return startTimesCount;
+  }
+
+  /**
+   * Discards entities with start timestamp less than or equal to the given
+   * timestamp.
+   */
+  @VisibleForTesting
+  void discardOldEntities(long timestamp) throws IOException,
+      InterruptedException {
+    long totalCount = 0;
+    long t1 = System.currentTimeMillis();
+    try {
+      totalCount += evictOldStartTimes(timestamp);
+      indexdb.evictOldDBs();
+      entitydb.evictOldDBs();
+    } finally {
+      long t2 = System.currentTimeMillis();
+      LOG.info("Discarded " + totalCount + " entities for timestamp "
+          + timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
+    }
+  }
+
+  Version loadVersion() throws IOException {
+    byte[] data = starttimedb.get(bytes(TIMELINE_STORE_VERSION_KEY));
+    // if version is not stored previously, treat it as 1.0.
+    if (data == null || data.length == 0) {
+      return Version.newInstance(1, 0);
+    }
+    Version version = new VersionPBImpl(VersionProto.parseFrom(data));
+    return version;
+  }
+
+  // Only used for test
+  @VisibleForTesting
+  void storeVersion(Version state) throws IOException {
+    dbStoreVersion(state);
+  }
+
+  private void dbStoreVersion(Version state) throws IOException {
+    String key = TIMELINE_STORE_VERSION_KEY;
+    byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
+    try {
+      starttimedb.put(bytes(key), data);
+    } catch (DBException e) {
+      throw new IOException(e);
+    }
+  }
+
+  Version getCurrentVersion() {
+    return CURRENT_VERSION_INFO;
+  }
+
+  /**
+   * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25,
+   * 2.0 etc. 2) Any incompatible change of TS-store is a major upgrade, and any
+   * compatible change of TS-store is a minor upgrade. 3) Within a minor
+   * upgrade, say 1.1 to 1.2: overwrite the version info and proceed as normal.
+   * 4) Within a major upgrade, say 1.2 to 2.0: throw exception and indicate
+   * user to use a separate upgrade tool to upgrade timeline store or remove
+   * incompatible old state.
+   */
+  private void checkVersion() throws IOException {
+    Version loadedVersion = loadVersion();
+    LOG.info("Loaded timeline store version info " + loadedVersion);
+    if (loadedVersion.equals(getCurrentVersion())) {
+      return;
+    }
+    if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
+      LOG.info("Storing timeline store version info " + getCurrentVersion());
+      dbStoreVersion(CURRENT_VERSION_INFO);
+    } else {
+      String incompatibleMessage = "Incompatible version for timeline store: "
+          + "expecting version " + getCurrentVersion()
+          + ", but loading version " + loadedVersion;
+      LOG.fatal(incompatibleMessage);
+      throw new IOException(incompatibleMessage);
+    }
+  }
+
+  // TODO: make data retention work with the domain data as well
+  @Override
+  public void put(TimelineDomain domain) throws IOException {
+    WriteBatch domainWriteBatch = null;
+    WriteBatch ownerWriteBatch = null;
+    try {
+      domainWriteBatch = domaindb.createWriteBatch();
+      ownerWriteBatch = ownerdb.createWriteBatch();
+      if (domain.getId() == null || domain.getId().length() == 0) {
+        throw new IllegalArgumentException("Domain doesn't have an ID");
+      }
+      if (domain.getOwner() == null || domain.getOwner().length() == 0) {
+        throw new IllegalArgumentException("Domain doesn't have an owner.");
+      }
+
+      // Write description
+      byte[] domainEntryKey = createDomainEntryKey(domain.getId(),
+          DESCRIPTION_COLUMN);
+      byte[] ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), DESCRIPTION_COLUMN);
+      if (domain.getDescription() != null) {
+        domainWriteBatch.put(domainEntryKey,
+            domain.getDescription().getBytes(UTF_8));
+        ownerWriteBatch.put(ownerLookupEntryKey, domain.getDescription()
+            .getBytes(UTF_8));
+      } else {
+        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
+        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
+      }
+
+      // Write owner
+      domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN);
+      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), OWNER_COLUMN);
+      // Null check for owner is done before
+      domainWriteBatch.put(domainEntryKey, domain.getOwner().getBytes(UTF_8));
+      ownerWriteBatch.put(ownerLookupEntryKey, domain.getOwner()
+          .getBytes(UTF_8));
+
+      // Write readers
+      domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN);
+      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), READER_COLUMN);
+      if (domain.getReaders() != null && domain.getReaders().length() > 0) {
+        domainWriteBatch.put(domainEntryKey, domain.getReaders()
+            .getBytes(UTF_8));
+        ownerWriteBatch.put(ownerLookupEntryKey,
+            domain.getReaders().getBytes(UTF_8));
+      } else {
+        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
+        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
+      }
+
+      // Write writers
+      domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN);
+      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), WRITER_COLUMN);
+      if (domain.getWriters() != null && domain.getWriters().length() > 0) {
+        domainWriteBatch.put(domainEntryKey, domain.getWriters()
+            .getBytes(UTF_8));
+        ownerWriteBatch.put(ownerLookupEntryKey,
+            domain.getWriters().getBytes(UTF_8));
+      } else {
+        domainWriteBatch.put(domainEntryKey, EMPTY_BYTES);
+        ownerWriteBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
+      }
+
+      // Write creation time and modification time
+      // We put both timestamps together because they are always retrieved
+      // together, and store them in the same way as we did for the entity's
+      // start time and insert time.
+      domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN);
+      ownerLookupEntryKey = createOwnerLookupKey(domain.getOwner(),
+          domain.getId(), TIMESTAMP_COLUMN);
+      long currentTimestamp = System.currentTimeMillis();
+      byte[] timestamps = domaindb.get(domainEntryKey);
+      if (timestamps == null) {
+        timestamps = new byte[16];
+        writeReverseOrderedLong(currentTimestamp, timestamps, 0);
+        writeReverseOrderedLong(currentTimestamp, timestamps, 8);
+      } else {
+        writeReverseOrderedLong(currentTimestamp, timestamps, 8);
+      }
+      domainWriteBatch.put(domainEntryKey, timestamps);
+      ownerWriteBatch.put(ownerLookupEntryKey, timestamps);
+      domaindb.write(domainWriteBatch);
+      ownerdb.write(ownerWriteBatch);
+    } finally {
+      IOUtils.cleanup(LOG, domainWriteBatch);
+      IOUtils.cleanup(LOG, ownerWriteBatch);
+    }
+  }
+
+  /**
+   * Creates a domain entity key with column name suffix, of the form
+   * DOMAIN_ENTRY_PREFIX + domain id + column name.
+   */
+  private static byte[] createDomainEntryKey(String domainId, byte[] columnName)
+      throws IOException {
+    return KeyBuilder.newInstance().add(domainId).add(columnName).getBytes();
+  }
+
+  /**
+   * Creates an owner lookup key with column name suffix, of the form
+   * OWNER_LOOKUP_PREFIX + owner + domain id + column name.
+   */
+  private static byte[] createOwnerLookupKey(String owner, String domainId,
+      byte[] columnName) throws IOException {
+    return KeyBuilder.newInstance().add(owner).add(domainId).add(columnName)
+        .getBytes();
+  }
+
+  @Override
+  public TimelineDomain getDomain(String domainId) throws IOException {
+    DBIterator iterator = null;
+    try {
+      byte[] prefix = KeyBuilder.newInstance().add(domainId)
+          .getBytesForLookup();
+      iterator = domaindb.iterator();
+      iterator.seek(prefix);
+      return getTimelineDomain(iterator, domainId, prefix);
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  @Override
+  public TimelineDomains getDomains(String owner) throws IOException {
+    DBIterator iterator = null;
+    try {
+      byte[] prefix = KeyBuilder.newInstance().add(owner).getBytesForLookup();
+      List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
+      for (iterator = ownerdb.iterator(), iterator.seek(prefix); iterator
+          .hasNext();) {
+        byte[] key = iterator.peekNext().getKey();
+        if (!prefixMatches(prefix, prefix.length, key)) {
+          break;
+        }
+        // Iterator to parse the rows of an individual domain
+        KeyParser kp = new KeyParser(key, prefix.length);
+        String domainId = kp.getNextString();
+        byte[] prefixExt = KeyBuilder.newInstance().add(owner).add(domainId)
+            .getBytesForLookup();
+        TimelineDomain domainToReturn = getTimelineDomain(iterator, domainId,
+            prefixExt);
+        if (domainToReturn != null) {
+          domains.add(domainToReturn);
+        }
+      }
+      // Sort the domains to return
+      Collections.sort(domains, new Comparator<TimelineDomain>() {
+        @Override
+        public int compare(TimelineDomain domain1, TimelineDomain domain2) {
+          int result = domain2.getCreatedTime().compareTo(
+              domain1.getCreatedTime());
+          if (result == 0) {
+            return domain2.getModifiedTime().compareTo(
+                domain1.getModifiedTime());
+          } else {
+            return result;
+          }
+        }
+      });
+      TimelineDomains domainsToReturn = new TimelineDomains();
+      domainsToReturn.addDomains(domains);
+      return domainsToReturn;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  private static TimelineDomain getTimelineDomain(DBIterator iterator,
+      String domainId, byte[] prefix) throws IOException {
+    // Iterate over all the rows whose key starts with prefix to retrieve the
+    // domain information.
+    TimelineDomain domain = new TimelineDomain();
+    domain.setId(domainId);
+    boolean noRows = true;
+    for (; iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefix.length, key)) {
+        break;
+      }
+      if (noRows) {
+        noRows = false;
+      }
+      byte[] value = iterator.peekNext().getValue();
+      if (value != null && value.length > 0) {
+        if (key[prefix.length] == DESCRIPTION_COLUMN[0]) {
+          domain.setDescription(new String(value, UTF_8));
+        } else if (key[prefix.length] == OWNER_COLUMN[0]) {
+          domain.setOwner(new String(value, UTF_8));
+        } else if (key[prefix.length] == READER_COLUMN[0]) {
+          domain.setReaders(new String(value, UTF_8));
+        } else if (key[prefix.length] == WRITER_COLUMN[0]) {
+          domain.setWriters(new String(value, UTF_8));
+        } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) {
+          domain.setCreatedTime(readReverseOrderedLong(value, 0));
+          domain.setModifiedTime(readReverseOrderedLong(value, 8));
+        } else {
+          LOG.error("Unrecognized domain column: " + key[prefix.length]);
+        }
+      }
+    }
+    if (noRows) {
+      return null;
+    } else {
+      return domain;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
index 8c6b83a..86aae77 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineDataManager.java
@@ -18,8 +18,6 @@
 
 package org.apache.hadoop.yarn.server.timeline;
 
-import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -43,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
 import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -51,7 +48,7 @@ import com.google.common.annotations.VisibleForTesting;
  * The class wrap over the timeline store and the ACLs manager. It does some non
  * trivial manipulation of the timeline data before putting or after getting it
  * from the timeline store, and checks the user's access to it.
- * 
+ *
  */
 public class TimelineDataManager extends AbstractService {
 
@@ -119,7 +116,7 @@ public class TimelineDataManager extends AbstractService {
    * Get the timeline entities that the given user have access to. The meaning
    * of each argument has been documented with
    * {@link TimelineReader#getEntities}.
-   * 
+   *
    * @see TimelineReader#getEntities
    */
   public TimelineEntities getEntities(
@@ -156,7 +153,7 @@ public class TimelineDataManager extends AbstractService {
    * Get the single timeline entity that the given user has access to. The
    * meaning of each argument has been documented with
    * {@link TimelineReader#getEntity}.
-   * 
+   *
    * @see TimelineReader#getEntity
    */
   public TimelineEntity getEntity(
@@ -182,7 +179,7 @@ public class TimelineDataManager extends AbstractService {
    * Get the events whose entities the given user has access to. The meaning of
    * each argument has been documented with
    * {@link TimelineReader#getEntityTimelines}.
-   * 
+   *
    * @see TimelineReader#getEntityTimelines
    */
   public TimelineEvents getEvents(
@@ -218,7 +215,7 @@ public class TimelineDataManager extends AbstractService {
             eventsItr.remove();
           }
         } catch (Exception e) {
-          LOG.error("Error when verifying access for user " + callerUGI
+          LOG.warn("Error when verifying access for user " + callerUGI
               + " on the events of the timeline entity "
               + new EntityIdentifier(eventsOfOneEntity.getEntityId(),
                   eventsOfOneEntity.getEntityType()), e);
@@ -242,13 +239,10 @@ public class TimelineDataManager extends AbstractService {
     if (entities == null) {
       return new TimelinePutResponse();
     }
-    List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
     TimelineEntities entitiesToPut = new TimelineEntities();
     List<TimelinePutResponse.TimelinePutError> errors =
         new ArrayList<TimelinePutResponse.TimelinePutError>();
     for (TimelineEntity entity : entities.getEntities()) {
-      EntityIdentifier entityID =
-          new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
 
       // if the domain id is not specified, the entity will be put into
       // the default domain
@@ -261,44 +255,42 @@ public class TimelineDataManager extends AbstractService {
       TimelineEntity existingEntity = null;
       try {
         existingEntity =
-            store.getEntity(entityID.getId(), entityID.getType(),
+            store.getEntity(entity.getEntityId(), entity.getEntityType(),
                 EnumSet.of(Field.PRIMARY_FILTERS));
         if (existingEntity != null) {
           addDefaultDomainIdIfAbsent(existingEntity);
           if (!existingEntity.getDomainId().equals(entity.getDomainId())) {
             throw new YarnException("The domain of the timeline entity "
-              + entityID + " is not allowed to be changed.");
+              + "{ id: " + entity.getEntityId() + ", type: "
+              + entity.getEntityType() + " } is not allowed to be changed from "
+              + existingEntity.getDomainId() + " to " + entity.getDomainId());
           }
         }
         if (!timelineACLsManager.checkAccess(
             callerUGI, ApplicationAccessType.MODIFY_APP, entity)) {
           throw new YarnException(callerUGI
-              + " is not allowed to put the timeline entity " + entityID
-              + " into the domain " + entity.getDomainId() + ".");
+              + " is not allowed to put the timeline entity "
+              + "{ id: " + entity.getEntityId() + ", type: "
+              + entity.getEntityType() + " } into the domain "
+              + entity.getDomainId() + ".");
         }
       } catch (Exception e) {
         // Skip the entity which already exists and was put by others
-        LOG.error("Skip the timeline entity: " + entityID, e);
+        LOG.warn("Skip the timeline entity: { id: " + entity.getEntityId()
+            + ", type: "+ entity.getEntityType() + " }", e);
         TimelinePutResponse.TimelinePutError error =
             new TimelinePutResponse.TimelinePutError();
-        error.setEntityId(entityID.getId());
-        error.setEntityType(entityID.getType());
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
         error.setErrorCode(
             TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
         errors.add(error);
         continue;
       }
 
-      entityIDs.add(entityID);
       entitiesToPut.addEntity(entity);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
-            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
-      }
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
     }
+
     TimelinePutResponse response = store.put(entitiesToPut);
     // add the errors of timeline system filter key conflict
     response.addErrors(errors);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
index fc6cc7d..5638581 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/util/LeveldbUtils.java
@@ -21,15 +21,16 @@ package org.apache.hadoop.yarn.server.timeline.util;
 
 import org.apache.hadoop.io.WritableComparator;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.charset.Charset;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
 
 public class LeveldbUtils {
 
+  /** A string builder utility for building timeline server leveldb keys. */
   public static class KeyBuilder {
+    /** Maximum subkeys that can be added to construct a key. */
     private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
     private byte[][] b;
     private boolean[] useSeparator;
@@ -47,8 +48,15 @@ public class LeveldbUtils {
       return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
     }
 
+    /** Instantiate a new key build with the given maximum subkes.
+     * @param size maximum subkeys that can be added to this key builder
+     * @return a newly constructed key builder */
+    public static KeyBuilder newInstance(final int size) {
+      return new KeyBuilder(size);
+    }
+
     public KeyBuilder add(String s) {
-      return add(s.getBytes(Charset.forName("UTF-8")), true);
+      return add(s.getBytes(UTF_8), true);
     }
 
     public KeyBuilder add(byte[] t) {
@@ -66,26 +74,37 @@ public class LeveldbUtils {
       return this;
     }
 
-    public byte[] getBytes() throws IOException {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+    /** Builds a byte array without the final string delimiter. */
+    public byte[] getBytes() {
+      // check the last valid entry to see the final length
+      int bytesLength = length;
+      if (useSeparator[index - 1]) {
+        bytesLength = length - 1;
+      }
+      byte[] bytes = new byte[bytesLength];
+      int curPos = 0;
       for (int i = 0; i < index; i++) {
-        baos.write(b[i]);
+        System.arraycopy(b[i], 0, bytes, curPos, b[i].length);
+        curPos += b[i].length;
         if (i < index - 1 && useSeparator[i]) {
-          baos.write(0x0);
+          bytes[curPos++] = 0x0;
         }
       }
-      return baos.toByteArray();
+      return bytes;
     }
 
-    public byte[] getBytesForLookup() throws IOException {
-      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+    /** Builds a byte array including the final string delimiter. */
+    public byte[] getBytesForLookup() {
+      byte[] bytes = new byte[length];
+      int curPos = 0;
       for (int i = 0; i < index; i++) {
-        baos.write(b[i]);
+        System.arraycopy(b[i], 0, bytes, curPos, b[i].length);
+        curPos += b[i].length;
         if (useSeparator[i]) {
-          baos.write(0x0);
+          bytes[curPos++] = 0x0;
         }
       }
-      return baos.toByteArray();
+      return bytes;
     }
   }
 
@@ -93,11 +112,12 @@ public class LeveldbUtils {
     private final byte[] b;
     private int offset;
 
-    public KeyParser(byte[] b, int offset) {
+    public KeyParser(final byte[] b, final int offset) {
       this.b = b;
       this.offset = offset;
     }
 
+    /** Returns a string from the offset until the next string delimiter. */
     public String getNextString() throws IOException {
       if (offset >= b.length) {
         throw new IOException(
@@ -107,23 +127,42 @@ public class LeveldbUtils {
       while (offset + i < b.length && b[offset + i] != 0x0) {
         i++;
       }
-      String s = new String(b, offset, i, Charset.forName("UTF-8"));
+      String s = new String(b, offset, i, UTF_8);
       offset = offset + i + 1;
       return s;
     }
 
+    /** Moves current position until after the next end of string marker. */
+    public void skipNextString() throws IOException {
+      if (offset >= b.length) {
+        throw new IOException("tried to read nonexistent string from byte array");
+      }
+      while (offset < b.length && b[offset] != 0x0) {
+        ++offset;
+      }
+      ++offset;
+    }
+
+    /** Read the next 8 bytes in the byte buffer as a long. */
     public long getNextLong() throws IOException {
       if (offset + 8 >= b.length) {
         throw new IOException("byte array ran out when trying to read long");
       }
-      long l = readReverseOrderedLong(b, offset);
+      long value = readReverseOrderedLong(b, offset);
       offset += 8;
-      return l;
+      return value;
     }
 
     public int getOffset() {
       return offset;
     }
+
+    /** Returns a copy of the remaining bytes. */
+    public byte[] getRemainingBytes() {
+      byte[] bytes = new byte[b.length - offset];
+      System.arraycopy(b, offset, bytes, 0, b.length - offset);
+      return bytes;
+    }
   }
 
   /**


Mime
View raw message