hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zjs...@apache.org
Subject [1/3] hadoop git commit: YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. Contributed by Jonathan Eagles.
Date Thu, 07 May 2015 17:10:47 GMT
Repository: hadoop
Updated Branches:
  refs/heads/branch-2 684a5a6ae -> bb035ff08


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb035ff0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
index c5c0f93..121e9f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java
@@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
-import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
-import org.apache.hadoop.yarn.server.timeline.NameValuePair;
 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
 import org.iq80.leveldb.DBException;
 import org.junit.After;
@@ -155,7 +153,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
       return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
           iterator, pfIterator, false);
     } catch(DBException e) {
-      throw new IOException(e);   	
+      throw new IOException(e);
     } finally {
       IOUtils.cleanup(null, iterator, pfIterator);
     }
@@ -179,12 +177,12 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils
{
     assertEquals(1, getEntities("type_2").size());
 
     assertEquals(false, deleteNextEntity(entityType1,
-        writeReverseOrderedLong(60l)));
+        writeReverseOrderedLong(60L)));
     assertEquals(3, getEntities("type_1").size());
     assertEquals(1, getEntities("type_2").size());
 
     assertEquals(true, deleteNextEntity(entityType1,
-        writeReverseOrderedLong(123l)));
+        writeReverseOrderedLong(123L)));
     List<TimelineEntity> entities = getEntities("type_2");
     assertEquals(1, entities.size());
     verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
@@ -198,12 +196,12 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils
{
     verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(1), domainId2);
 
-    ((LeveldbTimelineStore)store).discardOldEntities(-123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(0L);
     assertEquals(2, getEntities("type_1").size());
     assertEquals(0, getEntities("type_2").size());
     assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size());
 
-    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(123L);
     assertEquals(0, getEntities("type_1").size());
     assertEquals(0, getEntities("type_2").size());
     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
@@ -240,11 +238,11 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils
{
     verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
         primaryFilters, otherInfo, entities.get(2), domainId2);
 
-    ((LeveldbTimelineStore)store).discardOldEntities(-123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(-123L);
     assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
     assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
 
-    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(123L);
     assertEquals(0, getEntities("type_1").size());
     assertEquals(0, getEntities("type_2").size());
     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
@@ -261,7 +259,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     assertEquals(1, getEntitiesFromTs("type_2", l).size());
     assertEquals(3, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
         l).size());
-    ((LeveldbTimelineStore)store).discardOldEntities(123l);
+    ((LeveldbTimelineStore)store).discardOldEntities(123L);
     assertEquals(0, getEntitiesFromTs("type_1", l).size());
     assertEquals(0, getEntitiesFromTs("type_2", l).size());
     assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
@@ -279,7 +277,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
     assertEquals(1, getEntities("type_2").size());
     assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
   }
-  
+
   @Test
   public void testCheckVersion() throws IOException {
     LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store;
@@ -299,16 +297,15 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils
{
     Assert.assertEquals(defaultVersion, dbStore.loadVersion());
 
     // incompatible version
-    Version incompatibleVersion =
-      Version.newInstance(defaultVersion.getMajorVersion() + 1,
-          defaultVersion.getMinorVersion());
+    Version incompatibleVersion = Version.newInstance(
+        defaultVersion.getMajorVersion() + 1, defaultVersion.getMinorVersion());
     dbStore.storeVersion(incompatibleVersion);
     try {
       restartTimelineStore();
       Assert.fail("Incompatible version, should expect fail here.");
     } catch (ServiceStateException e) {
-      Assert.assertTrue("Exception message mismatch", 
-        e.getMessage().contains("Incompatible version for timeline store"));
+      Assert.assertTrue("Exception message mismatch",
+          e.getMessage().contains("Incompatible version for timeline store"));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb035ff0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java
new file mode 100644
index 0000000..d2d0860
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.iq80.leveldb.DB;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/** Test class for verification of RollingLevelDB. */
+public class TestRollingLevelDB {
+  private Configuration conf = new YarnConfiguration();
+  private FileSystem lfs;
+  private MyRollingLevelDB rollingLevelDB;
+
+  /** RollingLevelDB for testing that has a setting current time. */
+  public static class MyRollingLevelDB extends RollingLevelDB {
+    private long currentTimeMillis;
+
+    MyRollingLevelDB() {
+      super("Test");
+      this.currentTimeMillis = System.currentTimeMillis();
+    }
+
+    @Override
+    protected long currentTimeMillis() {
+      return currentTimeMillis;
+    }
+
+    public void setCurrentTimeMillis(long time) {
+      this.currentTimeMillis = time;
+    }
+  };
+
+  @Before
+  public void setup() throws Exception {
+    lfs = FileSystem.getLocal(conf);
+    File fsPath = new File("target", this.getClass().getSimpleName() +
+        "-tmpDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+        fsPath.getAbsolutePath());
+    lfs.delete(new Path(fsPath.getAbsolutePath()), true);
+    rollingLevelDB = new MyRollingLevelDB();
+  }
+
+  @Test
+  public void testInsertAfterRollPeriodRollsDB() throws Exception {
+
+    rollingLevelDB.init(conf);
+    long now = rollingLevelDB.currentTimeMillis();
+    DB db = rollingLevelDB.getDBForStartTime(now);
+    long startTime = rollingLevelDB.getStartTimeFor(db);
+    Assert.assertEquals("Received level db for incorrect start time",
+        rollingLevelDB.computeCurrentCheckMillis(now),
+        startTime);
+    now = rollingLevelDB.getNextRollingTimeMillis();
+    rollingLevelDB.setCurrentTimeMillis(now);
+    db = rollingLevelDB.getDBForStartTime(now);
+    startTime = rollingLevelDB.getStartTimeFor(db);
+    Assert.assertEquals("Received level db for incorrect start time",
+        rollingLevelDB.computeCurrentCheckMillis(now),
+        startTime);
+  }
+
+  @Test
+  public void testInsertForPreviousPeriodAfterRollPeriodRollsDB()
+      throws Exception {
+
+    rollingLevelDB.init(conf);
+    long now = rollingLevelDB.currentTimeMillis();
+    now = rollingLevelDB.computeCurrentCheckMillis(now);
+    rollingLevelDB.setCurrentTimeMillis(now);
+    DB db = rollingLevelDB.getDBForStartTime(now - 1);
+    long startTime = rollingLevelDB.getStartTimeFor(db);
+    Assert.assertEquals("Received level db for incorrect start time",
+        rollingLevelDB.computeCurrentCheckMillis(now - 1),
+        startTime);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb035ff0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java
new file mode 100644
index 0000000..956e9e9
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java
@@ -0,0 +1,427 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.timeline;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.ServiceStateException;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.records.Version;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mortbay.log.Log;
+
+/** Test class to verify RollingLevelDBTimelineStore. */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils {
+  private FileContext fsContext;
+  private File fsPath;
+  private Configuration config = new YarnConfiguration();
+
+  @Before
+  public void setup() throws Exception {
+    fsContext = FileContext.getLocalFSFileContext();
+    fsPath = new File("target", this.getClass().getSimpleName() +
+        "-tmpDir").getAbsoluteFile();
+    fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+    config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
+        fsPath.getAbsolutePath());
+    config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
+    store = new RollingLevelDBTimelineStore();
+    store.init(config);
+    store.start();
+    loadTestEntityData();
+    loadVerificationEntityData();
+    loadTestDomainData();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    store.stop();
+    fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
+  }
+
+  @Test
+  public void testRootDirPermission() throws IOException {
+    FileSystem fs = FileSystem.getLocal(new YarnConfiguration());
+    FileStatus file = fs.getFileStatus(new Path(fsPath.getAbsolutePath(),
+        RollingLevelDBTimelineStore.FILENAME));
+    assertNotNull(file);
+    assertEquals(RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK,
+        file.getPermission());
+  }
+
+  @Test
+  public void testGetSingleEntity() throws IOException {
+    super.testGetSingleEntity();
+    ((RollingLevelDBTimelineStore)store).clearStartTimeCache();
+    super.testGetSingleEntity();
+    loadTestEntityData();
+  }
+
+  @Test
+  public void testGetEntities() throws IOException {
+    super.testGetEntities();
+  }
+
+  @Test
+  public void testGetEntitiesWithFromId() throws IOException {
+    super.testGetEntitiesWithFromId();
+  }
+
+  @Test
+  public void testGetEntitiesWithFromTs() throws IOException {
+    // feature not supported
+  }
+
+  @Test
+  public void testGetEntitiesWithPrimaryFilters() throws IOException {
+    super.testGetEntitiesWithPrimaryFilters();
+  }
+
+  @Test
+  public void testGetEntitiesWithSecondaryFilters() throws IOException {
+    super.testGetEntitiesWithSecondaryFilters();
+  }
+
+  @Test
+  public void testGetEvents() throws IOException {
+    super.testGetEvents();
+  }
+
+  @Test
+  public void testCacheSizes() {
+    Configuration conf = new Configuration();
+    assertEquals(10000,
+        RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf));
+    assertEquals(10000,
+        RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf));
+    conf.setInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+        10001);
+    assertEquals(10001,
+        RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf));
+    conf = new Configuration();
+    conf.setInt(
+        YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+        10002);
+    assertEquals(10002,
+        RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf));
+  }
+
+  @Test
+  public void testCheckVersion() throws IOException {
+    RollingLevelDBTimelineStore dbStore = (RollingLevelDBTimelineStore) store;
+    // default version
+    Version defaultVersion = dbStore.getCurrentVersion();
+    Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+    // compatible version
+    Version compatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion(),
+          defaultVersion.getMinorVersion() + 2);
+    dbStore.storeVersion(compatibleVersion);
+    Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
+    restartTimelineStore();
+    dbStore = (RollingLevelDBTimelineStore) store;
+    // overwrite the compatible version
+    Assert.assertEquals(defaultVersion, dbStore.loadVersion());
+
+    // incompatible version
+    Version incompatibleVersion =
+        Version.newInstance(defaultVersion.getMajorVersion() + 1,
+          defaultVersion.getMinorVersion());
+    dbStore.storeVersion(incompatibleVersion);
+    try {
+      restartTimelineStore();
+      Assert.fail("Incompatible version, should expect fail here.");
+    } catch (ServiceStateException e) {
+      Assert.assertTrue("Exception message mismatch",
+          e.getMessage().contains("Incompatible version for timeline store"));
+    }
+  }
+
+  @Test
+  public void testValidateConfig() throws IOException {
+    Configuration copyConfig = new YarnConfiguration(config);
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration.TIMELINE_SERVICE_TTL_MS));
+    }
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS));
+    }
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
+    }
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(
+          YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
+          0);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration
+          .TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE));
+    }
+    try {
+      Configuration newConfig = new YarnConfiguration(copyConfig);
+      newConfig.setLong(
+          YarnConfiguration
+          .TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
+          0);
+      config = newConfig;
+      restartTimelineStore();
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertTrue(e.getMessage().contains(
+          YarnConfiguration
+          .TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE));
+    }
+    config = copyConfig;
+    restartTimelineStore();
+  }
+
+  private void restartTimelineStore() throws IOException {
+    // need to close so leveldb releases database lock
+    if (store != null) {
+      store.close();
+    }
+    store = new RollingLevelDBTimelineStore();
+    store.init(config);
+    store.start();
+  }
+
+  @Test
+  public void testGetDomain() throws IOException {
+    super.testGetDomain();
+  }
+
+  @Test
+  public void testGetDomains() throws IOException {
+    super.testGetDomains();
+  }
+
+  @Test
+  public void testRelatingToNonExistingEntity() throws IOException {
+    TimelineEntity entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
+    entityToStore.setEntityId("TEST_ENTITY_ID_1");
+    entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
+    entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    store.put(entities);
+    TimelineEntity entityToGet =
+        store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
+    Assert.assertEquals("TEST_ENTITY_TYPE_1",
+        entityToGet.getRelatedEntities().keySet().iterator().next());
+    Assert.assertEquals("TEST_ENTITY_ID_1",
+        entityToGet.getRelatedEntities().values().iterator().next()
+            .iterator().next());
+  }
+
+  @Test
+  public void testRelatingToEntityInSamePut() throws IOException {
+    TimelineEntity entityToRelate = new TimelineEntity();
+    entityToRelate.setEntityType("TEST_ENTITY_TYPE_2");
+    entityToRelate.setEntityId("TEST_ENTITY_ID_2");
+    entityToRelate.setDomainId("TEST_DOMAIN");
+    TimelineEntity entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
+    entityToStore.setEntityId("TEST_ENTITY_ID_1");
+    entityToStore.setDomainId("TEST_DOMAIN");
+    entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    entities.addEntity(entityToRelate);
+    store.put(entities);
+    TimelineEntity entityToGet =
+        store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("TEST_DOMAIN", entityToGet.getDomainId());
+    Assert.assertEquals("TEST_ENTITY_TYPE_1",
+        entityToGet.getRelatedEntities().keySet().iterator().next());
+    Assert.assertEquals("TEST_ENTITY_ID_1",
+        entityToGet.getRelatedEntities().values().iterator().next()
+            .iterator().next());
+  }
+
+  @Test
+  public void testRelatingToOldEntityWithoutDomainId() throws IOException {
+    // New entity is put in the default domain
+    TimelineEntity entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("NEW_ENTITY_TYPE_1");
+    entityToStore.setEntityId("NEW_ENTITY_ID_1");
+    entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
+    entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    store.put(entities);
+
+    TimelineEntity entityToGet =
+        store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
+    Assert.assertEquals("NEW_ENTITY_TYPE_1",
+        entityToGet.getRelatedEntities().keySet().iterator().next());
+    Assert.assertEquals("NEW_ENTITY_ID_1",
+        entityToGet.getRelatedEntities().values().iterator().next()
+            .iterator().next());
+
+    // New entity is not put in the default domain
+    entityToStore = new TimelineEntity();
+    entityToStore.setEntityType("NEW_ENTITY_TYPE_2");
+    entityToStore.setEntityId("NEW_ENTITY_ID_2");
+    entityToStore.setDomainId("NON_DEFAULT");
+    entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
+    entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+    TimelinePutResponse response = store.put(entities);
+    Assert.assertEquals(1, response.getErrors().size());
+    Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION,
+        response.getErrors().get(0).getErrorCode());
+    entityToGet =
+        store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
+    Assert.assertNotNull(entityToGet);
+    Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
+    // Still have one related entity
+    Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size());
+    Assert.assertEquals(1, entityToGet.getRelatedEntities().values()
+        .iterator().next().size());
+  }
+
+  public void testStorePerformance() throws IOException {
+    TimelineEntity entityToStorePrep = new TimelineEntity();
+    entityToStorePrep.setEntityType("TEST_ENTITY_TYPE_PREP");
+    entityToStorePrep.setEntityId("TEST_ENTITY_ID_PREP");
+    entityToStorePrep.setDomainId("TEST_DOMAIN");
+    entityToStorePrep.addRelatedEntity("TEST_ENTITY_TYPE_2",
+        "TEST_ENTITY_ID_2");
+    entityToStorePrep.setStartTime(0L);
+
+    TimelineEntities entitiesPrep = new TimelineEntities();
+    entitiesPrep.addEntity(entityToStorePrep);
+    store.put(entitiesPrep);
+
+    long start = System.currentTimeMillis();
+    int num = 1000000;
+
+    Log.info("Start test for " + num);
+
+    final String tezTaskAttemptId = "TEZ_TA";
+    final String tezEntityId = "attempt_1429158534256_0001_1_00_000000_";
+    final String tezTaskId = "TEZ_T";
+    final String tezDomainId = "Tez_ATS_application_1429158534256_0001";
+
+    TimelineEntity entityToStore = new TimelineEntity();
+    TimelineEvent startEvt = new TimelineEvent();
+    entityToStore.setEntityType(tezTaskAttemptId);
+
+    startEvt.setEventType("TASK_ATTEMPT_STARTED");
+    startEvt.setTimestamp(0);
+    entityToStore.addEvent(startEvt);
+    entityToStore.setDomainId(tezDomainId);
+
+    entityToStore.addPrimaryFilter("status", "SUCCEEDED");
+    entityToStore.addPrimaryFilter("applicationId",
+        "application_1429158534256_0001");
+    entityToStore.addPrimaryFilter("TEZ_VERTEX_ID",
+        "vertex_1429158534256_0001_1_00");
+    entityToStore.addPrimaryFilter("TEZ_DAG_ID", "dag_1429158534256_0001_1");
+    entityToStore.addPrimaryFilter("TEZ_TASK_ID",
+        "task_1429158534256_0001_1_00_000000");
+
+    entityToStore.setStartTime(0L);
+    entityToStore.addOtherInfo("startTime", 0);
+    entityToStore.addOtherInfo("inProgressLogsURL",
+        "localhost:8042/inProgressLogsURL");
+    entityToStore.addOtherInfo("completedLogsURL", "");
+    entityToStore.addOtherInfo("nodeId", "localhost:54450");
+    entityToStore.addOtherInfo("nodeHttpAddress", "localhost:8042");
+    entityToStore.addOtherInfo("containerId",
+        "container_1429158534256_0001_01_000002");
+    entityToStore.addOtherInfo("status", "RUNNING");
+    entityToStore.addRelatedEntity(tezTaskId, "TEZ_TASK_ID_1");
+
+    TimelineEntities entities = new TimelineEntities();
+    entities.addEntity(entityToStore);
+
+    for (int i = 0; i < num; ++i) {
+      entityToStore.setEntityId(tezEntityId + i);
+      store.put(entities);
+    }
+
+    long duration = System.currentTimeMillis() - start;
+    Log.info("Duration for " + num + ": " + duration);
+  }
+
+  public static void main(String[] args) throws Exception {
+    TestRollingLevelDBTimelineStore store =
+        new TestRollingLevelDBTimelineStore();
+    store.setup();
+    store.testStorePerformance();
+    store.tearDown();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bb035ff0/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java
index 6ac5a35..71e298c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java
@@ -70,7 +70,7 @@ public class TimelineStoreTestUtils {
   protected String entityId6;
   protected String entityId7;
   protected String entityType7;
-  
+
   protected Map<String, Set<Object>> primaryFilters;
   protected Map<String, Object> secondaryFilters;
   protected Map<String, Object> allFilters;
@@ -105,7 +105,7 @@ public class TimelineStoreTestUtils {
     Set<Object> l1 = new HashSet<Object>();
     l1.add("username");
     Set<Object> l2 = new HashSet<Object>();
-    l2.add((long)Integer.MAX_VALUE);
+    l2.add(Integer.MAX_VALUE);
     Set<Object> l3 = new HashSet<Object>();
     l3.add("123abc");
     Set<Object> l4 = new HashSet<Object>();
@@ -115,7 +115,7 @@ public class TimelineStoreTestUtils {
     primaryFilters.put("other", l3);
     primaryFilters.put("long", l4);
     Map<String, Object> secondaryFilters = new HashMap<String, Object>();
-    secondaryFilters.put("startTime", 123456l);
+    secondaryFilters.put("startTime", 123456);
     secondaryFilters.put("status", "RUNNING");
     Map<String, Object> otherInfo1 = new HashMap<String, Object>();
     otherInfo1.put("info1", "val1");
@@ -139,7 +139,7 @@ public class TimelineStoreTestUtils {
     relatedEntities.put(entityType2, Collections.singleton(entityId2));
 
     TimelineEvent ev3 = createEvent(789l, "launch_event", null);
-    TimelineEvent ev4 = createEvent(-123l, "init_event", null);
+    TimelineEvent ev4 = createEvent(0l, "init_event", null);
     List<TimelineEvent> events = new ArrayList<TimelineEvent>();
     events.add(ev3);
     events.add(ev4);
@@ -302,7 +302,7 @@ public class TimelineStoreTestUtils {
     relEntityMap2.put(entityType4, Collections.singleton(entityId4));
 
     ev3 = createEvent(789l, "launch_event", null);
-    ev4 = createEvent(-123l, "init_event", null);
+    ev4 = createEvent(0l, "init_event", null);
     events2 = new ArrayList<TimelineEvent>();
     events2.add(ev3);
     events2.add(ev4);
@@ -384,7 +384,7 @@ public class TimelineStoreTestUtils {
         entityType1, EnumSet.allOf(Field.class)), domainId1);
 
     verifyEntityInfo(entityId2, entityType2, events2, relEntityMap,
-        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2,
+        EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 0l, store.getEntity(entityId2,
         entityType2, EnumSet.allOf(Field.class)), domainId1);
 
     verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES,


Mime
View raw message