hadoop-yarn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject svn commit: r1598094 [1/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-common/src/main/resources/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/appl...
Date Wed, 28 May 2014 18:09:05 GMT
Author: zjshen
Date: Wed May 28 18:09:04 2014
New Revision: 1598094

URL: http://svn.apache.org/r1598094
Log:
YARN-2107. Refactored timeline classes into o.a.h.y.s.timeline package. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityIdentifier.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/GenericObjectMapper.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/MemoryTimelineStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/NameValuePair.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineReader.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/TimelineWriter.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/package-info.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineACLsManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilter.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineAuthenticationFilterInitializer.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineClientAuthenticationService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/security/TimelineDelegationTokenSecretManagerService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/webapp/TimelineWebServices.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestGenericObjectMapper.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestMemoryTimelineStore.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/security/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/security/TestTimelineACLsManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/webapp/TestTimelineWebServices.java
Removed:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TimelineWebServices.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestTimelineWebServices.java
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/TimelineServer.apt.vm

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1598094&r1=1598093&r2=1598094&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Wed May 28 18:09:04 2014
@@ -111,6 +111,9 @@ Release 2.5.0 - UNRELEASED
     YARN-2071. Modified levelDB store permissions to be readable only by the
     server user. (Zhijie Shen via vinodkv)
 
+    YARN-2107. Refactored timeline classes into o.a.h.y.s.timeline package. (Vinod
+    Kumar Vavilapalli via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1598094&r1=1598093&r2=1598094&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Wed May 28 18:09:04 2014
@@ -1143,7 +1143,7 @@
   <property>
     <description>Store class name for timeline store.</description>
     <name>yarn.timeline-service.store-class</name>
-    <value>org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore</value>
+    <value>org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore</value>
   </property>
 
   <property>

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java?rev=1598094&r1=1598093&r2=1598094&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/ApplicationHistoryServer.java Wed May 28 18:09:04 2014
@@ -38,12 +38,12 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.LeveldbTimelineStore;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.security.TimelineAuthenticationFilterInitializer;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.security.TimelineDelegationTokenSecretManagerService;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
+import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java?rev=1598094&r1=1598093&r2=1598094&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebApp.java Wed May 28 18:09:04 2014
@@ -22,9 +22,10 @@ import static org.apache.hadoop.yarn.uti
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.yarn.server.api.ApplicationContext;
 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.TimelineStore;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.security.TimelineACLsManager;
-import org.apache.hadoop.yarn.server.applicationhistoryservice.timeline.security.TimelineDelegationTokenSecretManagerService;
+import org.apache.hadoop.yarn.server.timeline.TimelineStore;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
+import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
+import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityIdentifier.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityIdentifier.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityIdentifier.java Wed May 28 18:09:04 2014
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * The unique identifier for an entity
+ */
+@Private
+@Unstable
+public class EntityIdentifier implements Comparable<EntityIdentifier> {
+
+  private String id;
+  private String type;
+
+  public EntityIdentifier(String id, String type) {
+    this.id = id;
+    this.type = type;
+  }
+
+  /**
+   * Get the entity Id.
+   * @return The entity Id.
+   */
+  public String getId() {
+    return id;
+  }
+
+  /**
+   * Get the entity type.
+   * @return The entity type.
+   */
+  public String getType() {
+    return type;
+  }
+
+  @Override
+  public int compareTo(EntityIdentifier other) {
+    int c = type.compareTo(other.type);
+    if (c != 0) return c;
+    return id.compareTo(other.id);
+  }
+
+  @Override
+  public int hashCode() {
+    // generated by eclipse
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((id == null) ? 0 : id.hashCode());
+    result = prime * result + ((type == null) ? 0 : type.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    // generated by eclipse
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    EntityIdentifier other = (EntityIdentifier) obj;
+    if (id == null) {
+      if (other.id != null)
+        return false;
+    } else if (!id.equals(other.id))
+      return false;
+    if (type == null) {
+      if (other.type != null)
+        return false;
+    } else if (!type.equals(other.type))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "{ id: " + id + ", type: "+ type + " }";
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/GenericObjectMapper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/GenericObjectMapper.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/GenericObjectMapper.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/GenericObjectMapper.java Wed May 28 18:09:04 2014
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.codehaus.jackson.map.ObjectWriter;
+
+/**
+ * A utility class providing methods for serializing and deserializing
+ * objects. The {@link #write(Object)} and {@link #read(byte[])} methods are
+ * used by the {@link LeveldbTimelineStore} to store and retrieve arbitrary
+ * JSON, while the {@link #writeReverseOrderedLong} and {@link
+ * #readReverseOrderedLong} methods are used to sort entities in descending
+ * start time order.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class GenericObjectMapper {
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  public static final ObjectReader OBJECT_READER;
+  public static final ObjectWriter OBJECT_WRITER;
+
+  static {
+    ObjectMapper mapper = new ObjectMapper();
+    OBJECT_READER = mapper.reader(Object.class);
+    OBJECT_WRITER = mapper.writer();
+  }
+
+  /**
+   * Serializes an Object into a byte array. Along with {@link #read(byte[])},
+   * can be used to serialize an Object and deserialize it into an Object of
+   * the same type without needing to specify the Object's type,
+   * as long as it is one of the JSON-compatible objects understood by
+   * ObjectMapper.
+   *
+   * @param o An Object
+   * @return A byte array representation of the Object
+   * @throws IOException if there is a write error
+   */
+  public static byte[] write(Object o) throws IOException {
+    if (o == null) {
+      return EMPTY_BYTES;
+    }
+    return OBJECT_WRITER.writeValueAsBytes(o);
+  }
+
+  /**
+   * Deserializes an Object from a byte array created with
+   * {@link #write(Object)}.
+   *
+   * @param b A byte array
+   * @return An Object
+   * @throws IOException if there is a read error
+   */
+  public static Object read(byte[] b) throws IOException {
+    return read(b, 0);
+  }
+
+  /**
+   * Deserializes an Object from a byte array at a specified offset, assuming
+   * the bytes were created with {@link #write(Object)}.
+   *
+   * @param b A byte array
+   * @param offset Offset into the array
+   * @return An Object
+   * @throws IOException if there is a read error
+   */
+  public static Object read(byte[] b, int offset) throws IOException {
+    if (b == null || b.length == 0) {
+      return null;
+    }
+    return OBJECT_READER.readValue(b, offset, b.length - offset);
+  }
+
+  /**
+   * Converts a long to a 8-byte array so that lexicographic ordering of the
+   * produced byte arrays sort the longs in descending order.
+   *
+   * @param l A long
+   * @return A byte array
+   */
+  public static byte[] writeReverseOrderedLong(long l) {
+    byte[] b = new byte[8];
+    return writeReverseOrderedLong(l, b, 0);
+  }
+
+  public static byte[] writeReverseOrderedLong(long l, byte[] b, int offset) {
+    b[offset] = (byte)(0x7f ^ ((l >> 56) & 0xff));
+    for (int i = offset+1; i < offset+7; i++) {
+      b[i] = (byte)(0xff ^ ((l >> 8*(7-i)) & 0xff));
+    }
+    b[offset+7] = (byte)(0xff ^ (l & 0xff));
+    return b;
+  }
+
+  /**
+   * Reads 8 bytes from an array starting at the specified offset and
+   * converts them to a long.  The bytes are assumed to have been created
+   * with {@link #writeReverseOrderedLong}.
+   *
+   * @param b A byte array
+   * @param offset An offset into the byte array
+   * @return A long
+   */
+  public static long readReverseOrderedLong(byte[] b, int offset) {
+    long l = b[offset] & 0xff;
+    for (int i = 1; i < 8; i++) {
+      l = l << 8;
+      l = l | (b[offset+i]&0xff);
+    }
+    return l ^ 0x7fffffffffffffffl;
+  }
+
+}

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java?rev=1598094&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/timeline/LeveldbTimelineStore.java Wed May 28 18:09:04 2014
@@ -0,0 +1,1492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timeline;
+
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
+import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.collections.map.LRUMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.ReadOptions;
+import org.iq80.leveldb.WriteBatch;
+import org.iq80.leveldb.WriteOptions;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * <p>An implementation of an application timeline store backed by leveldb.</p>
+ *
+ * <p>There are three sections of the db, the start time section,
+ * the entity section, and the indexed entity section.</p>
+ *
+ * <p>The start time section is used to retrieve the unique start time for
+ * a given entity. Its values each contain a start time while its keys are of
+ * the form:</p>
+ * <pre>
+ *   START_TIME_LOOKUP_PREFIX + entity type + entity id</pre>
+ *
+ * <p>The entity section is ordered by entity type, then entity start time
+ * descending, then entity ID. There are four sub-sections of the entity
+ * section: events, primary filters, related entities,
+ * and other info. The event entries have event info serialized into their
+ * values. The other info entries have values corresponding to the values of
+ * the other info name/value map for the entry (note the names are contained
+ * in the key). All other entries have empty values. The key structure is as
+ * follows:</p>
+ * <pre>
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     EVENTS_COLUMN + reveventtimestamp + eventtype
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     PRIMARY_FILTERS_COLUMN + name + value
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     OTHER_INFO_COLUMN + name
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
+ *
+ *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
+ *     INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type +
+ *     relatedentity id</pre>
+ *
+ * <p>The indexed entity section contains a primary filter name and primary
+ * filter value as the prefix. Within a given name/value, entire entity
+ * entries are stored in the same format as described in the entity section
+ * above (below, "key" represents any one of the possible entity entry keys
+ * described above).</p>
+ * <pre>
+ *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
+ *     key</pre>
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class LeveldbTimelineStore extends AbstractService
+    implements TimelineStore {
+  private static final Log LOG = LogFactory
+      .getLog(LeveldbTimelineStore.class);
+
+  @Private
+  @VisibleForTesting
+  static final String FILENAME = "leveldb-timeline-store.ldb";
+
+  private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes();
+  private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes();
+  private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes();
+
+  private static final byte[] EVENTS_COLUMN = "e".getBytes();
+  private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes();
+  private static final byte[] OTHER_INFO_COLUMN = "i".getBytes();
+  private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes();
+  private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN =
+      "z".getBytes();
+
+  private static final byte[] EMPTY_BYTES = new byte[0];
+
+  @Private
+  @VisibleForTesting
+  static final FsPermission LEVELDB_DIR_UMASK = FsPermission
+      .createImmutable((short) 0700);
+
+  private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache;
+  private Map<EntityIdentifier, Long> startTimeReadCache;
+
+  /**
+   * Per-entity locks are obtained when writing.
+   */
+  private final LockMap<EntityIdentifier> writeLocks =
+      new LockMap<EntityIdentifier>();
+
+  private final ReentrantReadWriteLock deleteLock =
+      new ReentrantReadWriteLock();
+
+  private DB db;
+
+  private Thread deletionThread;
+
+  public LeveldbTimelineStore() {
+    super(LeveldbTimelineStore.class.getName());
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  protected void serviceInit(Configuration conf) throws Exception {
+    Options options = new Options();
+    options.createIfMissing(true);
+    options.cacheSize(conf.getLong(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+    JniDBFactory factory = new JniDBFactory();
+    Path dbPath = new Path(
+        conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
+    FileSystem localFS = null;
+    try {
+      localFS = FileSystem.getLocal(conf);
+      if (!localFS.exists(dbPath)) {
+        if (!localFS.mkdirs(dbPath)) {
+          throw new IOException("Couldn't create directory for leveldb " +
+              "timeline store " + dbPath);
+        }
+        localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
+      }
+    } finally {
+      IOUtils.cleanup(LOG, localFS);
+    }
+    LOG.info("Using leveldb path " + dbPath);
+    db = factory.open(new File(dbPath.toString()), options);
+    startTimeWriteCache =
+        Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
+            conf)));
+    startTimeReadCache =
+        Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
+            conf)));
+
+    if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) {
+      deletionThread = new EntityDeletionThread(conf);
+      deletionThread.start();
+    }
+
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (deletionThread != null) {
+      deletionThread.interrupt();
+      LOG.info("Waiting for deletion thread to complete its current action");
+      try {
+        deletionThread.join();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for deletion thread to complete," +
+            " closing db now", e);
+      }
+    }
+    IOUtils.cleanup(LOG, db);
+    super.serviceStop();
+  }
+
+  private static class StartAndInsertTime {
+    final long startTime;
+    final long insertTime;
+
+    public StartAndInsertTime(long startTime, long insertTime) {
+      this.startTime = startTime;
+      this.insertTime = insertTime;
+    }
+  }
+
+  private class EntityDeletionThread extends Thread {
+    private final long ttl;
+    private final long ttlInterval;
+
+    public EntityDeletionThread(Configuration conf) {
+      ttl  = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
+      ttlInterval = conf.getLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
+      LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " +
+          "interval " + ttlInterval);
+    }
+
+    @Override
+    public void run() {
+      while (true) {
+        long timestamp = System.currentTimeMillis() - ttl;
+        try {
+          discardOldEntities(timestamp);
+          Thread.sleep(ttlInterval);
+        } catch (IOException e) {
+          LOG.error(e);
+        } catch (InterruptedException e) {
+          LOG.info("Deletion thread received interrupt, exiting");
+          break;
+        }
+      }
+    }
+  }
+
+  private static class LockMap<K> {
+    private static class CountingReentrantLock<K> extends ReentrantLock {
+      private static final long serialVersionUID = 1L;
+      private int count;
+      private K key;
+
+      CountingReentrantLock(K key) {
+        super();
+        this.count = 0;
+        this.key = key;
+      }
+    }
+
+    private Map<K, CountingReentrantLock<K>> locks =
+        new HashMap<K, CountingReentrantLock<K>>();
+
+    synchronized CountingReentrantLock<K> getLock(K key) {
+      CountingReentrantLock<K> lock = locks.get(key);
+      if (lock == null) {
+        lock = new CountingReentrantLock<K>(key);
+        locks.put(key, lock);
+      }
+
+      lock.count++;
+      return lock;
+    }
+
+    synchronized void returnLock(CountingReentrantLock<K> lock) {
+      if (lock.count == 0) {
+        throw new IllegalStateException("Returned lock more times than it " +
+            "was retrieved");
+      }
+      lock.count--;
+
+      if (lock.count == 0) {
+        locks.remove(lock.key);
+      }
+    }
+  }
+
+  private static class KeyBuilder {
+    private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
+    private byte[][] b;
+    private boolean[] useSeparator;
+    private int index;
+    private int length;
+
+    public KeyBuilder(int size) {
+      b = new byte[size][];
+      useSeparator = new boolean[size];
+      index = 0;
+      length = 0;
+    }
+
+    public static KeyBuilder newInstance() {
+      return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
+    }
+
+    public KeyBuilder add(String s) {
+      return add(s.getBytes(), true);
+    }
+
+    public KeyBuilder add(byte[] t) {
+      return add(t, false);
+    }
+
+    public KeyBuilder add(byte[] t, boolean sep) {
+      b[index] = t;
+      useSeparator[index] = sep;
+      length += t.length;
+      if (sep) {
+        length++;
+      }
+      index++;
+      return this;
+    }
+
+    public byte[] getBytes() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+      for (int i = 0; i < index; i++) {
+        baos.write(b[i]);
+        if (i < index-1 && useSeparator[i]) {
+          baos.write(0x0);
+        }
+      }
+      return baos.toByteArray();
+    }
+
+    public byte[] getBytesForLookup() throws IOException {
+      ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
+      for (int i = 0; i < index; i++) {
+        baos.write(b[i]);
+        if (useSeparator[i]) {
+          baos.write(0x0);
+        }
+      }
+      return baos.toByteArray();
+    }
+  }
+
+  private static class KeyParser {
+    private final byte[] b;
+    private int offset;
+
+    public KeyParser(byte[] b, int offset) {
+      this.b = b;
+      this.offset = offset;
+    }
+
+    public String getNextString() throws IOException {
+      if (offset >= b.length) {
+        throw new IOException(
+            "tried to read nonexistent string from byte array");
+      }
+      int i = 0;
+      while (offset+i < b.length && b[offset+i] != 0x0) {
+        i++;
+      }
+      String s = new String(b, offset, i);
+      offset = offset + i + 1;
+      return s;
+    }
+
+    public long getNextLong() throws IOException {
+      if (offset+8 >= b.length) {
+        throw new IOException("byte array ran out when trying to read long");
+      }
+      long l = readReverseOrderedLong(b, offset);
+      offset += 8;
+      return l;
+    }
+
+    public int getOffset() {
+      return offset;
+    }
+  }
+
+  @Override
+  public TimelineEntity getEntity(String entityId, String entityType,
+      EnumSet<Field> fields) throws IOException {
+    Long revStartTime = getStartTimeLong(entityId, entityType);
+    if (revStartTime == null) {
+      return null;
+    }
+    byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entityType).add(writeReverseOrderedLong(revStartTime))
+        .add(entityId).getBytesForLookup();
+
+    DBIterator iterator = null;
+    try {
+      iterator = db.iterator();
+      iterator.seek(prefix);
+
+      return getEntity(entityId, entityType, revStartTime, fields, iterator,
+          prefix, prefix.length);
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Read entity from a db iterator.  If no information is found in the
+   * specified fields for this entity, return null.
+   */
+  private static TimelineEntity getEntity(String entityId, String entityType,
+      Long startTime, EnumSet<Field> fields, DBIterator iterator,
+      byte[] prefix, int prefixlen) throws IOException {
+    if (fields == null) {
+      fields = EnumSet.allOf(Field.class);
+    }
+
+    TimelineEntity entity = new TimelineEntity();
+    boolean events = false;
+    boolean lastEvent = false;
+    if (fields.contains(Field.EVENTS)) {
+      events = true;
+    } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
+      lastEvent = true;
+    } else {
+      entity.setEvents(null);
+    }
+    boolean relatedEntities = false;
+    if (fields.contains(Field.RELATED_ENTITIES)) {
+      relatedEntities = true;
+    } else {
+      entity.setRelatedEntities(null);
+    }
+    boolean primaryFilters = false;
+    if (fields.contains(Field.PRIMARY_FILTERS)) {
+      primaryFilters = true;
+    } else {
+      entity.setPrimaryFilters(null);
+    }
+    boolean otherInfo = false;
+    if (fields.contains(Field.OTHER_INFO)) {
+      otherInfo = true;
+    } else {
+      entity.setOtherInfo(null);
+    }
+
+    // iterate through the entity's entry, parsing information if it is part
+    // of a requested field
+    for (; iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefixlen, key)) {
+        break;
+      }
+      if (key.length == prefixlen) {
+        continue;
+      }
+      if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
+        if (primaryFilters) {
+          addPrimaryFilter(entity, key,
+              prefixlen + PRIMARY_FILTERS_COLUMN.length);
+        }
+      } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
+        if (otherInfo) {
+          entity.addOtherInfo(parseRemainingKey(key,
+              prefixlen + OTHER_INFO_COLUMN.length),
+              GenericObjectMapper.read(iterator.peekNext().getValue()));
+        }
+      } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
+        if (relatedEntities) {
+          addRelatedEntity(entity, key,
+              prefixlen + RELATED_ENTITIES_COLUMN.length);
+        }
+      } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
+        if (events || (lastEvent &&
+            entity.getEvents().size() == 0)) {
+          TimelineEvent event = getEntityEvent(null, key, prefixlen +
+              EVENTS_COLUMN.length, iterator.peekNext().getValue());
+          if (event != null) {
+            entity.addEvent(event);
+          }
+        }
+      } else {
+        if (key[prefixlen] !=
+            INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
+          LOG.warn(String.format("Found unexpected column for entity %s of " +
+              "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
+        }
+      }
+    }
+
+    entity.setEntityId(entityId);
+    entity.setEntityType(entityType);
+    entity.setStartTime(startTime);
+
+    return entity;
+  }
+
+  @Override
+  public TimelineEvents getEntityTimelines(String entityType,
+      SortedSet<String> entityIds, Long limit, Long windowStart,
+      Long windowEnd, Set<String> eventType) throws IOException {
+    TimelineEvents events = new TimelineEvents();
+    if (entityIds == null || entityIds.isEmpty()) {
+      return events;
+    }
+    // create a lexicographically-ordered map from start time to entities
+    Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
+        List<EntityIdentifier>>(new Comparator<byte[]>() {
+          @Override
+          public int compare(byte[] o1, byte[] o2) {
+            return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
+                o2.length);
+          }
+        });
+    DBIterator iterator = null;
+    try {
+      // look up start times for the specified entities
+      // skip entities with no start time
+      for (String entityId : entityIds) {
+        byte[] startTime = getStartTime(entityId, entityType);
+        if (startTime != null) {
+          List<EntityIdentifier> entities = startTimeMap.get(startTime);
+          if (entities == null) {
+            entities = new ArrayList<EntityIdentifier>();
+            startTimeMap.put(startTime, entities);
+          }
+          entities.add(new EntityIdentifier(entityId, entityType));
+        }
+      }
+      for (Entry<byte[], List<EntityIdentifier>> entry :
+          startTimeMap.entrySet()) {
+        // look up the events matching the given parameters (limit,
+        // start time, end time, event types) for entities whose start times
+        // were found and add the entities to the return list
+        byte[] revStartTime = entry.getKey();
+        for (EntityIdentifier entityIdentifier : entry.getValue()) {
+          EventsOfOneEntity entity = new EventsOfOneEntity();
+          entity.setEntityId(entityIdentifier.getId());
+          entity.setEntityType(entityType);
+          events.addEvent(entity);
+          KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+              .add(entityType).add(revStartTime).add(entityIdentifier.getId())
+              .add(EVENTS_COLUMN);
+          byte[] prefix = kb.getBytesForLookup();
+          if (windowEnd == null) {
+            windowEnd = Long.MAX_VALUE;
+          }
+          byte[] revts = writeReverseOrderedLong(windowEnd);
+          kb.add(revts);
+          byte[] first = kb.getBytesForLookup();
+          byte[] last = null;
+          if (windowStart != null) {
+            last = KeyBuilder.newInstance().add(prefix)
+                .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
+          }
+          if (limit == null) {
+            limit = DEFAULT_LIMIT;
+          }
+          iterator = db.iterator();
+          for (iterator.seek(first); entity.getEvents().size() < limit &&
+              iterator.hasNext(); iterator.next()) {
+            byte[] key = iterator.peekNext().getKey();
+            if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
+                WritableComparator.compareBytes(key, 0, key.length, last, 0,
+                    last.length) > 0)) {
+              break;
+            }
+            TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
+                iterator.peekNext().getValue());
+            if (event != null) {
+              entity.addEvent(event);
+            }
+          }
+        }
+      }
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+    return events;
+  }
+
+  /**
+   * Returns true if the byte array begins with the specified prefix.
+   */
+  private static boolean prefixMatches(byte[] prefix, int prefixlen,
+      byte[] b) {
+    if (b.length < prefixlen) {
+      return false;
+    }
+    return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
+        prefixlen) == 0;
+  }
+
+  @Override
+  public TimelineEntities getEntities(String entityType,
+      Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
+      NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) throws IOException {
+    if (primaryFilter == null) {
+      // if no primary filter is specified, prefix the lookup with
+      // ENTITY_ENTRY_PREFIX
+      return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
+          windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields);
+    } else {
+      // if a primary filter is specified, prefix the lookup with
+      // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
+      // ENTITY_ENTRY_PREFIX
+      byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
+          .add(primaryFilter.getName())
+          .add(GenericObjectMapper.write(primaryFilter.getValue()), true)
+          .add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
+      return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
+          fromId, fromTs, secondaryFilters, fields);
+    }
+  }
+
+  /**
+   * Retrieves a list of entities satisfying given parameters.
+   *
+   * @param base A byte array prefix for the lookup
+   * @param entityType The type of the entity
+   * @param limit A limit on the number of entities to return
+   * @param starttime The earliest entity start time to retrieve (exclusive)
+   * @param endtime The latest entity start time to retrieve (inclusive)
+   * @param fromId Retrieve entities starting with this entity
+   * @param fromTs Ignore entities with insert timestamp later than this ts
+   * @param secondaryFilters Filter pairs that the entities should match
+   * @param fields The set of fields to retrieve
+   * @return A list of entities
+   * @throws IOException
+   */
+  private TimelineEntities getEntityByTime(byte[] base,
+      String entityType, Long limit, Long starttime, Long endtime,
+      String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
+      EnumSet<Field> fields) throws IOException {
+    DBIterator iterator = null;
+    try {
+      KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
+      // only db keys matching the prefix (base + entity type) will be parsed
+      byte[] prefix = kb.getBytesForLookup();
+      if (endtime == null) {
+        // if end time is null, place no restriction on end time
+        endtime = Long.MAX_VALUE;
+      }
+      // construct a first key that will be seeked to using end time or fromId
+      byte[] first = null;
+      if (fromId != null) {
+        Long fromIdStartTime = getStartTimeLong(fromId, entityType);
+        if (fromIdStartTime == null) {
+          // no start time for provided id, so return empty entities
+          return new TimelineEntities();
+        }
+        if (fromIdStartTime <= endtime) {
+          // if provided id's start time falls before the end of the window,
+          // use it to construct the seek key
+          first = kb.add(writeReverseOrderedLong(fromIdStartTime))
+              .add(fromId).getBytesForLookup();
+        }
+      }
+      // if seek key wasn't constructed using fromId, construct it using end ts
+      if (first == null) {
+        first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
+      }
+      byte[] last = null;
+      if (starttime != null) {
+        // if start time is not null, set a last key that will not be
+        // iterated past
+        last = KeyBuilder.newInstance().add(base).add(entityType)
+            .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
+      }
+      if (limit == null) {
+        // if limit is not specified, use the default
+        limit = DEFAULT_LIMIT;
+      }
+
+      TimelineEntities entities = new TimelineEntities();
+      iterator = db.iterator();
+      iterator.seek(first);
+      // iterate until one of the following conditions is met: limit is
+      // reached, there are no more keys, the key prefix no longer matches,
+      // or a start time has been specified and reached/exceeded
+      while (entities.getEntities().size() < limit && iterator.hasNext()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
+            WritableComparator.compareBytes(key, 0, key.length, last, 0,
+                last.length) > 0)) {
+          break;
+        }
+        // read the start time and entity id from the current key
+        KeyParser kp = new KeyParser(key, prefix.length);
+        Long startTime = kp.getNextLong();
+        String entityId = kp.getNextString();
+
+        if (fromTs != null) {
+          long insertTime = readReverseOrderedLong(iterator.peekNext()
+              .getValue(), 0);
+          if (insertTime > fromTs) {
+            byte[] firstKey = key;
+            while (iterator.hasNext() && prefixMatches(firstKey,
+                kp.getOffset(), key)) {
+              iterator.next();
+              key = iterator.peekNext().getKey();
+            }
+            continue;
+          }
+        }
+
+        // parse the entity that owns this key, iterating over all keys for
+        // the entity
+        TimelineEntity entity = getEntity(entityId, entityType, startTime,
+            fields, iterator, key, kp.getOffset());
+        // determine if the retrieved entity matches the provided secondary
+        // filters, and if so add it to the list of entities to return
+        boolean filterPassed = true;
+        if (secondaryFilters != null) {
+          for (NameValuePair filter : secondaryFilters) {
+            Object v = entity.getOtherInfo().get(filter.getName());
+            if (v == null) {
+              Set<Object> vs = entity.getPrimaryFilters()
+                  .get(filter.getName());
+              if (vs != null && !vs.contains(filter.getValue())) {
+                filterPassed = false;
+                break;
+              }
+            } else if (!v.equals(filter.getValue())) {
+              filterPassed = false;
+              break;
+            }
+          }
+        }
+        if (filterPassed) {
+          entities.addEntity(entity);
+        }
+      }
+      return entities;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Put a single entity.  If there is an error, add a TimelinePutError to the
+   * given response.
+   */
+  private void put(TimelineEntity entity, TimelinePutResponse response) {
+    LockMap.CountingReentrantLock<EntityIdentifier> lock =
+        writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
+            entity.getEntityType()));
+    lock.lock();
+    WriteBatch writeBatch = null;
+    List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
+        new ArrayList<EntityIdentifier>();
+    byte[] revStartTime = null;
+    try {
+      writeBatch = db.createWriteBatch();
+      List<TimelineEvent> events = entity.getEvents();
+      // look up the start time for the entity
+      StartAndInsertTime startAndInsertTime = getAndSetStartTime(
+          entity.getEntityId(), entity.getEntityType(),
+          entity.getStartTime(), events);
+      if (startAndInsertTime == null) {
+        // if no start time is found, add an error and return
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.NO_START_TIME);
+        response.addError(error);
+        return;
+      }
+      revStartTime = writeReverseOrderedLong(startAndInsertTime
+          .startTime);
+
+      Map<String, Set<Object>> primaryFilters = entity.getPrimaryFilters();
+
+      // write entity marker
+      byte[] markerKey = createEntityMarkerKey(entity.getEntityId(),
+          entity.getEntityType(), revStartTime);
+      byte[] markerValue = writeReverseOrderedLong(startAndInsertTime
+          .insertTime);
+      writeBatch.put(markerKey, markerValue);
+      writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey,
+          markerValue);
+
+      // write event entries
+      if (events != null && !events.isEmpty()) {
+        for (TimelineEvent event : events) {
+          byte[] revts = writeReverseOrderedLong(event.getTimestamp());
+          byte[] key = createEntityEventKey(entity.getEntityId(),
+              entity.getEntityType(), revStartTime, revts,
+              event.getEventType());
+          byte[] value = GenericObjectMapper.write(event.getEventInfo());
+          writeBatch.put(key, value);
+          writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
+        }
+      }
+
+      // write related entity entries
+      Map<String, Set<String>> relatedEntities =
+          entity.getRelatedEntities();
+      if (relatedEntities != null && !relatedEntities.isEmpty()) {
+        for (Entry<String, Set<String>> relatedEntityList :
+            relatedEntities.entrySet()) {
+          String relatedEntityType = relatedEntityList.getKey();
+          for (String relatedEntityId : relatedEntityList.getValue()) {
+            // invisible "reverse" entries (entity -> related entity)
+            byte[] key = createReverseRelatedEntityKey(entity.getEntityId(),
+                entity.getEntityType(), revStartTime, relatedEntityId,
+                relatedEntityType);
+            writeBatch.put(key, EMPTY_BYTES);
+            // look up start time of related entity
+            byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
+                relatedEntityType);
+            // delay writing the related entity if no start time is found
+            if (relatedEntityStartTime == null) {
+              relatedEntitiesWithoutStartTimes.add(
+                  new EntityIdentifier(relatedEntityId, relatedEntityType));
+              continue;
+            }
+            // write "forward" entry (related entity -> entity)
+            key = createRelatedEntityKey(relatedEntityId,
+                relatedEntityType, relatedEntityStartTime,
+                entity.getEntityId(), entity.getEntityType());
+            writeBatch.put(key, EMPTY_BYTES);
+          }
+        }
+      }
+
+      // write primary filter entries
+      if (primaryFilters != null && !primaryFilters.isEmpty()) {
+        for (Entry<String, Set<Object>> primaryFilter :
+            primaryFilters.entrySet()) {
+          for (Object primaryFilterValue : primaryFilter.getValue()) {
+            byte[] key = createPrimaryFilterKey(entity.getEntityId(),
+                entity.getEntityType(), revStartTime,
+                primaryFilter.getKey(), primaryFilterValue);
+            writeBatch.put(key, EMPTY_BYTES);
+            writePrimaryFilterEntries(writeBatch, primaryFilters, key,
+                EMPTY_BYTES);
+          }
+        }
+      }
+
+      // write other info entries
+      Map<String, Object> otherInfo = entity.getOtherInfo();
+      if (otherInfo != null && !otherInfo.isEmpty()) {
+        for (Entry<String, Object> i : otherInfo.entrySet()) {
+          byte[] key = createOtherInfoKey(entity.getEntityId(),
+              entity.getEntityType(), revStartTime, i.getKey());
+          byte[] value = GenericObjectMapper.write(i.getValue());
+          writeBatch.put(key, value);
+          writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
+        }
+      }
+      db.write(writeBatch);
+    } catch (IOException e) {
+      LOG.error("Error putting entity " + entity.getEntityId() +
+          " of type " + entity.getEntityType(), e);
+      TimelinePutError error = new TimelinePutError();
+      error.setEntityId(entity.getEntityId());
+      error.setEntityType(entity.getEntityType());
+      error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+      response.addError(error);
+    } finally {
+      lock.unlock();
+      writeLocks.returnLock(lock);
+      IOUtils.cleanup(LOG, writeBatch);
+    }
+
+    for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
+      lock = writeLocks.getLock(relatedEntity);
+      lock.lock();
+      try {
+        StartAndInsertTime relatedEntityStartAndInsertTime =
+            getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(),
+            readReverseOrderedLong(revStartTime, 0), null);
+        if (relatedEntityStartAndInsertTime == null) {
+          throw new IOException("Error setting start time for related entity");
+        }
+        byte[] relatedEntityStartTime = writeReverseOrderedLong(
+            relatedEntityStartAndInsertTime.startTime);
+        db.put(createRelatedEntityKey(relatedEntity.getId(),
+            relatedEntity.getType(), relatedEntityStartTime,
+            entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
+        db.put(createEntityMarkerKey(relatedEntity.getId(),
+            relatedEntity.getType(), relatedEntityStartTime),
+            writeReverseOrderedLong(relatedEntityStartAndInsertTime
+                .insertTime));
+      } catch (IOException e) {
+        LOG.error("Error putting related entity " + relatedEntity.getId() +
+            " of type " + relatedEntity.getType() + " for entity " +
+            entity.getEntityId() + " of type " + entity.getEntityType(), e);
+        TimelinePutError error = new TimelinePutError();
+        error.setEntityId(entity.getEntityId());
+        error.setEntityType(entity.getEntityType());
+        error.setErrorCode(TimelinePutError.IO_EXCEPTION);
+        response.addError(error);
+      } finally {
+        lock.unlock();
+        writeLocks.returnLock(lock);
+      }
+    }
+  }
+
+  /**
+   * For a given key / value pair that has been written to the db,
+   * write additional entries to the db for each primary filter.
+   */
+  private static void writePrimaryFilterEntries(WriteBatch writeBatch,
+      Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
+      throws IOException {
+    if (primaryFilters != null && !primaryFilters.isEmpty()) {
+      for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
+        for (Object pfval : pf.getValue()) {
+          writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval,
+              key), value);
+        }
+      }
+    }
+  }
+
+  @Override
+  public TimelinePutResponse put(TimelineEntities entities) {
+    try {
+      deleteLock.readLock().lock();
+      TimelinePutResponse response = new TimelinePutResponse();
+      for (TimelineEntity entity : entities.getEntities()) {
+        put(entity, response);
+      }
+      return response;
+    } finally {
+      deleteLock.readLock().unlock();
+    }
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts
+   * the timestamps in reverse order (see {@link
+   * GenericObjectMapper#writeReverseOrderedLong(long)}).
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
+   * @return A byte array, null if not found
+   * @throws IOException
+   */
+  private byte[] getStartTime(String entityId, String entityType)
+      throws IOException {
+    Long l = getStartTimeLong(entityId, entityType);
+    return l == null ? null : writeReverseOrderedLong(l);
+  }
+
+  /**
+   * Get the unique start time for a given entity as a Long.
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
+   * @return A Long, null if not found
+   * @throws IOException
+   */
+  private Long getStartTimeLong(String entityId, String entityType)
+      throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    // start time is not provided, so try to look it up
+    if (startTimeReadCache.containsKey(entity)) {
+      // found the start time in the cache
+      return startTimeReadCache.get(entity);
+    } else {
+      // try to look up the start time in the db
+      byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+      byte[] v = db.get(b);
+      if (v == null) {
+        // did not find the start time in the db
+        return null;
+      } else {
+        // found the start time in the db
+        Long l = readReverseOrderedLong(v, 0);
+        startTimeReadCache.put(entity, l);
+        return l;
+      }
+    }
+  }
+
+  /**
+   * Get the unique start time for a given entity as a byte array that sorts
+   * the timestamps in reverse order (see {@link
+   * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
+   * doesn't exist, set it based on the information provided. Should only be
+   * called when a lock has been obtained on the entity.
+   *
+   * @param entityId The id of the entity
+   * @param entityType The type of the entity
+   * @param startTime The start time of the entity, or null
+   * @param events A list of events for the entity, or null
+   * @return A StartAndInsertTime
+   * @throws IOException
+   */
+  private StartAndInsertTime getAndSetStartTime(String entityId,
+      String entityType, Long startTime, List<TimelineEvent> events)
+      throws IOException {
+    EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
+    if (startTime == null) {
+      // start time is not provided, so try to look it up
+      if (startTimeWriteCache.containsKey(entity)) {
+        // found the start time in the cache
+        return startTimeWriteCache.get(entity);
+      } else {
+        if (events != null) {
+          // prepare a start time from events in case it is needed
+          Long min = Long.MAX_VALUE;
+          for (TimelineEvent e : events) {
+            if (min > e.getTimestamp()) {
+              min = e.getTimestamp();
+            }
+          }
+          startTime = min;
+        }
+        return checkStartTimeInDb(entity, startTime);
+      }
+    } else {
+      // start time is provided
+      if (startTimeWriteCache.containsKey(entity)) {
+        // always use start time from cache if it exists
+        return startTimeWriteCache.get(entity);
+      } else {
+        // check the provided start time matches the db
+        return checkStartTimeInDb(entity, startTime);
+      }
+    }
+  }
+
+  /**
+   * Checks db for start time and returns it if it exists.  If it doesn't
+   * exist, writes the suggested start time (if it is not null).  This is
+   * only called when the start time is not found in the cache,
+   * so it adds it back into the cache if it is found. Should only be called
+   * when a lock has been obtained on the entity.
+   */
+  private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity,
+      Long suggestedStartTime) throws IOException {
+    StartAndInsertTime startAndInsertTime = null;
+    // create lookup key for start time
+    byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
+    // retrieve value for key
+    byte[] v = db.get(b);
+    if (v == null) {
+      // start time doesn't exist in db
+      if (suggestedStartTime == null) {
+        return null;
+      }
+      startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
+          System.currentTimeMillis());
+
+      // write suggested start time
+      v = new byte[16];
+      writeReverseOrderedLong(suggestedStartTime, v, 0);
+      writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
+      WriteOptions writeOptions = new WriteOptions();
+      writeOptions.sync(true);
+      db.put(b, v, writeOptions);
+    } else {
+      // found start time in db, so ignore suggested start time
+      startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
+          readReverseOrderedLong(v, 8));
+    }
+    startTimeWriteCache.put(entity, startAndInsertTime);
+    startTimeReadCache.put(entity, startAndInsertTime.startTime);
+    return startAndInsertTime;
+  }
+
+  /**
+   * Creates a key for looking up the start time of a given entity,
+   * of the form START_TIME_LOOKUP_PREFIX + entity type + entity id.
+   */
+  private static byte[] createStartTimeLookupKey(String entityId,
+      String entityType) throws IOException {
+    return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
+        .add(entityType).add(entityId).getBytes();
+  }
+
+  /**
+   * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id.
+   */
+  private static byte[] createEntityMarkerKey(String entityId,
+      String entityType, byte[] revStartTime) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
+  }
+
+  /**
+   * Creates an index entry for the given key of the form
+   * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
+   */
+  private static byte[] addPrimaryFilterToKey(String primaryFilterName,
+      Object primaryFilterValue, byte[] key) throws IOException {
+    return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
+        .add(primaryFilterName)
+        .add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
+        .getBytes();
+  }
+
+  /**
+   * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type.
+   */
+  private static byte[] createEntityEventKey(String entityId,
+      String entityType, byte[] revStartTime, byte[] revEventTimestamp,
+      String eventType) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+        .add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN)
+        .add(revEventTimestamp).add(eventType).getBytes();
+  }
+
+  /**
+   * Creates an event object from the given key, offset, and value.  If the
+   * event type is not contained in the specified set of event types,
+   * returns null.
+   */
+  private static TimelineEvent getEntityEvent(Set<String> eventTypes,
+      byte[] key, int offset, byte[] value) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    long ts = kp.getNextLong();
+    String tstype = kp.getNextString();
+    if (eventTypes == null || eventTypes.contains(tstype)) {
+      TimelineEvent event = new TimelineEvent();
+      event.setTimestamp(ts);
+      event.setEventType(tstype);
+      Object o = GenericObjectMapper.read(value);
+      if (o == null) {
+        event.setEventInfo(null);
+      } else if (o instanceof Map) {
+        @SuppressWarnings("unchecked")
+        Map<String, Object> m = (Map<String, Object>) o;
+        event.setEventInfo(m);
+      } else {
+        throw new IOException("Couldn't deserialize event info map");
+      }
+      return event;
+    }
+    return null;
+  }
+
+  /**
+   * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
+   * entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name +
+   * value.
+   */
+  private static byte[] createPrimaryFilterKey(String entityId,
+      String entityType, byte[] revStartTime, String name, Object value)
+      throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name)
+        .add(GenericObjectMapper.write(value)).getBytes();
+  }
+
+  /**
+   * Parses the primary filter from the given key at the given offset and
+   * adds it to the given entity.
+   */
+  private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String name = kp.getNextString();
+    Object value = GenericObjectMapper.read(key, kp.getOffset());
+    entity.addPrimaryFilter(name, value);
+  }
+
+  /**
+   * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type +
+   * revstarttime + entity id + OTHER_INFO_COLUMN + name.
+   */
+  private static byte[] createOtherInfoKey(String entityId, String entityType,
+      byte[] revStartTime, String name) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name)
+        .getBytes();
+  }
+
+  /**
+   * Creates a string representation of the byte array from the given offset
+   * to the end of the array (for parsing other info keys).
+   */
+  private static String parseRemainingKey(byte[] b, int offset) {
+    return new String(b, offset, b.length - offset);
+  }
+
+  /**
+   * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
+   * entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN +
+   * relatedentity type + relatedentity id.
+   */
+  private static byte[] createRelatedEntityKey(String entityId,
+      String entityType, byte[] revStartTime, String relatedEntityId,
+      String relatedEntityType) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN)
+        .add(relatedEntityType).add(relatedEntityId).getBytes();
+  }
+
+  /**
+   * Parses the related entity from the given key at the given offset and
+   * adds it to the given entity.
+   */
+  private static void addRelatedEntity(TimelineEntity entity, byte[] key,
+      int offset) throws IOException {
+    KeyParser kp = new KeyParser(key, offset);
+    String type = kp.getNextString();
+    String id = kp.getNextString();
+    entity.addRelatedEntity(type, id);
+  }
+
+  /**
+   * Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX +
+   * entity type + revstarttime + entity id +
+   * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN +
+   * relatedentity type + relatedentity id.
+   */
+  private static byte[] createReverseRelatedEntityKey(String entityId,
+      String entityType, byte[] revStartTime, String relatedEntityId,
+      String relatedEntityType) throws IOException {
+    return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
+        .add(revStartTime).add(entityId)
+        .add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN)
+        .add(relatedEntityType).add(relatedEntityId).getBytes();
+  }
+
+  /**
+   * Clears the cache to test reloading start times from leveldb (only for
+   * testing).
+   */
+  @VisibleForTesting
+  void clearStartTimeCache() {
+    startTimeWriteCache.clear();
+    startTimeReadCache.clear();
+  }
+
+  @VisibleForTesting
+  static int getStartTimeReadCacheSize(Configuration conf) {
+    return conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+        YarnConfiguration.
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
+  }
+
+  @VisibleForTesting
+  static int getStartTimeWriteCacheSize(Configuration conf) {
+    return conf.getInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+        YarnConfiguration.
+            DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
+  }
+
+  // warning is suppressed to prevent eclipse from noting unclosed resource
+  @SuppressWarnings("resource")
+  @VisibleForTesting
+  List<String> getEntityTypes() throws IOException {
+    DBIterator iterator = null;
+    try {
+      iterator = getDbIterator(false);
+      List<String> entityTypes = new ArrayList<String>();
+      iterator.seek(ENTITY_ENTRY_PREFIX);
+      while (iterator.hasNext()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (key[0] != ENTITY_ENTRY_PREFIX[0]) {
+          break;
+        }
+        KeyParser kp = new KeyParser(key,
+            ENTITY_ENTRY_PREFIX.length);
+        String entityType = kp.getNextString();
+        entityTypes.add(entityType);
+        byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+            .add(entityType).getBytesForLookup();
+        if (lookupKey[lookupKey.length - 1] != 0x0) {
+          throw new IOException("Found unexpected end byte in lookup key");
+        }
+        lookupKey[lookupKey.length - 1] = 0x1;
+        iterator.seek(lookupKey);
+      }
+      return entityTypes;
+    } finally {
+      IOUtils.cleanup(LOG, iterator);
+    }
+  }
+
+  /**
+   * Finds all keys in the db that have a given prefix and deletes them on
+   * the given write batch.
+   */
+  private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
+      DBIterator iterator) {
+    for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
+      byte[] key = iterator.peekNext().getKey();
+      if (!prefixMatches(prefix, prefix.length, key)) {
+        break;
+      }
+      writeBatch.delete(key);
+    }
+  }
+
+  @VisibleForTesting
+  boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
+      DBIterator iterator, DBIterator pfIterator, boolean seeked)
+      throws IOException {
+    WriteBatch writeBatch = null;
+    try {
+      KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
+          .add(entityType);
+      byte[] typePrefix = kb.getBytesForLookup();
+      kb.add(reverseTimestamp);
+      if (!seeked) {
+        iterator.seek(kb.getBytesForLookup());
+      }
+      if (!iterator.hasNext()) {
+        return false;
+      }
+      byte[] entityKey = iterator.peekNext().getKey();
+      if (!prefixMatches(typePrefix, typePrefix.length, entityKey)) {
+        return false;
+      }
+
+      // read the start time and entity id from the current key
+      KeyParser kp = new KeyParser(entityKey, typePrefix.length + 8);
+      String entityId = kp.getNextString();
+      int prefixlen = kp.getOffset();
+      byte[] deletePrefix = new byte[prefixlen];
+      System.arraycopy(entityKey, 0, deletePrefix, 0, prefixlen);
+
+      writeBatch = db.createWriteBatch();
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleting entity type:" + entityType + " id:" + entityId);
+      }
+      // remove start time from cache and db
+      writeBatch.delete(createStartTimeLookupKey(entityId, entityType));
+      EntityIdentifier entityIdentifier =
+          new EntityIdentifier(entityId, entityType);
+      startTimeReadCache.remove(entityIdentifier);
+      startTimeWriteCache.remove(entityIdentifier);
+
+      // delete current entity
+      for (; iterator.hasNext(); iterator.next()) {
+        byte[] key = iterator.peekNext().getKey();
+        if (!prefixMatches(entityKey, prefixlen, key)) {
+          break;
+        }
+        writeBatch.delete(key);
+
+        if (key.length == prefixlen) {
+          continue;
+        }
+        if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
+          kp = new KeyParser(key,
+              prefixlen + PRIMARY_FILTERS_COLUMN.length);
+          String name = kp.getNextString();
+          Object value = GenericObjectMapper.read(key, kp.getOffset());
+          deleteKeysWithPrefix(writeBatch, addPrimaryFilterToKey(name, value,
+              deletePrefix), pfIterator);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " primary filter entry " + name + " " +
+                value);
+          }
+        } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
+          kp = new KeyParser(key,
+              prefixlen + RELATED_ENTITIES_COLUMN.length);
+          String type = kp.getNextString();
+          String id = kp.getNextString();
+          byte[] relatedEntityStartTime = getStartTime(id, type);
+          if (relatedEntityStartTime == null) {
+            LOG.warn("Found no start time for " +
+                "related entity " + id + " of type " + type + " while " +
+                "deleting " + entityId + " of type " + entityType);
+            continue;
+          }
+          writeBatch.delete(createReverseRelatedEntityKey(id, type,
+              relatedEntityStartTime, entityId, entityType));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " from invisible reverse related entity " +
+                "entry of type:" + type + " id:" + id);
+          }
+        } else if (key[prefixlen] ==
+            INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
+          kp = new KeyParser(key, prefixlen +
+              INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN.length);
+          String type = kp.getNextString();
+          String id = kp.getNextString();
+          byte[] relatedEntityStartTime = getStartTime(id, type);
+          if (relatedEntityStartTime == null) {
+            LOG.warn("Found no start time for reverse " +
+                "related entity " + id + " of type " + type + " while " +
+                "deleting " + entityId + " of type " + entityType);
+            continue;
+          }
+          writeBatch.delete(createRelatedEntityKey(id, type,
+              relatedEntityStartTime, entityId, entityType));
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Deleting entity type:" + entityType + " id:" +
+                entityId + " from related entity entry of type:" +
+                type + " id:" + id);
+          }
+        }
+      }
+      WriteOptions writeOptions = new WriteOptions();
+      writeOptions.sync(true);
+      db.write(writeBatch, writeOptions);
+      return true;
+    } finally {
+      IOUtils.cleanup(LOG, writeBatch);
+    }
+  }
+
+  /**
+   * Discards entities with start timestamp less than or equal to the given
+   * timestamp.
+   */
+  @VisibleForTesting
+  void discardOldEntities(long timestamp)
+      throws IOException, InterruptedException {
+    byte[] reverseTimestamp = writeReverseOrderedLong(timestamp);
+    long totalCount = 0;
+    long t1 = System.currentTimeMillis();
+    try {
+      List<String> entityTypes = getEntityTypes();
+      for (String entityType : entityTypes) {
+        DBIterator iterator = null;
+        DBIterator pfIterator = null;
+        long typeCount = 0;
+        try {
+          deleteLock.writeLock().lock();
+          iterator = getDbIterator(false);
+          pfIterator = getDbIterator(false);
+
+          if (deletionThread != null && deletionThread.isInterrupted()) {
+            throw new InterruptedException();
+          }
+          boolean seeked = false;
+          while (deleteNextEntity(entityType, reverseTimestamp, iterator,
+              pfIterator, seeked)) {
+            typeCount++;
+            totalCount++;
+            seeked = true;
+            if (deletionThread != null && deletionThread.isInterrupted()) {
+              throw new InterruptedException();
+            }
+          }
+        } catch (IOException e) {
+          LOG.error("Got IOException while deleting entities for type " +
+              entityType + ", continuing to next type", e);
+        } finally {
+          IOUtils.cleanup(LOG, iterator, pfIterator);
+          deleteLock.writeLock().unlock();
+          if (typeCount > 0) {
+            LOG.info("Deleted " + typeCount + " entities of type " +
+                entityType);
+          }
+        }
+      }
+    } finally {
+      long t2 = System.currentTimeMillis();
+      LOG.info("Discarded " + totalCount + " entities for timestamp " +
+          timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
+    }
+  }
+
+  @VisibleForTesting
+  DBIterator getDbIterator(boolean fillCache) {
+    ReadOptions readOptions = new ReadOptions();
+    readOptions.fillCache(fillCache);
+    return db.iterator(readOptions);
+  }
+}



Mime
View raw message