Return-Path: X-Original-To: apmail-hadoop-common-commits-archive@www.apache.org Delivered-To: apmail-hadoop-common-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BD20A17C0D for ; Thu, 7 May 2015 17:09:31 +0000 (UTC) Received: (qmail 27443 invoked by uid 500); 7 May 2015 17:09:31 -0000 Delivered-To: apmail-hadoop-common-commits-archive@hadoop.apache.org Received: (qmail 27375 invoked by uid 500); 7 May 2015 17:09:31 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: common-dev@hadoop.apache.org Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 27359 invoked by uid 99); 7 May 2015 17:09:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 07 May 2015 17:09:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13BF1E4415; Thu, 7 May 2015 17:09:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: zjshen@apache.org To: common-commits@hadoop.apache.org Date: Thu, 07 May 2015 17:09:31 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/3] hadoop git commit: YARN-3448. Added a rolling time-to-live LevelDB timeline store implementation. Contributed by Jonathan Eagles. Repository: hadoop Updated Branches: refs/heads/trunk 8e991f4b1 -> daf3e4ef8 http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java index c5c0f93..121e9f3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLeveldbTimelineStore.java @@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.records.Version; -import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; -import org.apache.hadoop.yarn.server.timeline.NameValuePair; import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.iq80.leveldb.DBException; import org.junit.After; @@ -155,7 +153,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts, iterator, pfIterator, false); } catch(DBException e) { - throw new IOException(e); + throw new IOException(e); } finally { IOUtils.cleanup(null, iterator, pfIterator); } @@ -179,12 +177,12 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { assertEquals(1, getEntities("type_2").size()); assertEquals(false, deleteNextEntity(entityType1, - writeReverseOrderedLong(60l))); + writeReverseOrderedLong(60L))); assertEquals(3, getEntities("type_1").size()); assertEquals(1, getEntities("type_2").size()); assertEquals(true, deleteNextEntity(entityType1, - writeReverseOrderedLong(123l))); + writeReverseOrderedLong(123L))); List entities = getEntities("type_2"); assertEquals(1, entities.size()); verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap( @@ -198,12 +196,12 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(1), domainId2); - ((LeveldbTimelineStore)store).discardOldEntities(-123l); + ((LeveldbTimelineStore)store).discardOldEntities(0L); assertEquals(2, getEntities("type_1").size()); assertEquals(0, getEntities("type_2").size()); assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size()); - ((LeveldbTimelineStore)store).discardOldEntities(123l); + ((LeveldbTimelineStore)store).discardOldEntities(123L); assertEquals(0, getEntities("type_1").size()); assertEquals(0, getEntities("type_2").size()); assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); @@ -240,11 +238,11 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, primaryFilters, otherInfo, entities.get(2), domainId2); - ((LeveldbTimelineStore)store).discardOldEntities(-123l); + ((LeveldbTimelineStore)store).discardOldEntities(-123L); assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); - ((LeveldbTimelineStore)store).discardOldEntities(123l); + ((LeveldbTimelineStore)store).discardOldEntities(123L); assertEquals(0, getEntities("type_1").size()); assertEquals(0, getEntities("type_2").size()); assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); @@ -261,7 +259,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { assertEquals(1, getEntitiesFromTs("type_2", l).size()); assertEquals(3, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, l).size()); - ((LeveldbTimelineStore)store).discardOldEntities(123l); + ((LeveldbTimelineStore)store).discardOldEntities(123L); assertEquals(0, getEntitiesFromTs("type_1", l).size()); assertEquals(0, getEntitiesFromTs("type_2", l).size()); assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, @@ -279,7 +277,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { assertEquals(1, getEntities("type_2").size()); assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); } - + @Test public void testCheckVersion() throws IOException { LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store; @@ -299,16 +297,15 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { Assert.assertEquals(defaultVersion, dbStore.loadVersion()); // incompatible version - Version incompatibleVersion = - Version.newInstance(defaultVersion.getMajorVersion() + 1, - defaultVersion.getMinorVersion()); + Version incompatibleVersion = Version.newInstance( + defaultVersion.getMajorVersion() + 1, defaultVersion.getMinorVersion()); dbStore.storeVersion(incompatibleVersion); try { restartTimelineStore(); Assert.fail("Incompatible version, should expect fail here."); } catch (ServiceStateException e) { - Assert.assertTrue("Exception message mismatch", - e.getMessage().contains("Incompatible version for timeline store")); + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for timeline store")); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java new file mode 100644 index 0000000..d2d0860 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDB.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import java.io.File; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.iq80.leveldb.DB; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +/** Test class for verification of RollingLevelDB. */ +public class TestRollingLevelDB { + private Configuration conf = new YarnConfiguration(); + private FileSystem lfs; + private MyRollingLevelDB rollingLevelDB; + + /** RollingLevelDB for testing that has a setting current time. */ + public static class MyRollingLevelDB extends RollingLevelDB { + private long currentTimeMillis; + + MyRollingLevelDB() { + super("Test"); + this.currentTimeMillis = System.currentTimeMillis(); + } + + @Override + protected long currentTimeMillis() { + return currentTimeMillis; + } + + public void setCurrentTimeMillis(long time) { + this.currentTimeMillis = time; + } + }; + + @Before + public void setup() throws Exception { + lfs = FileSystem.getLocal(conf); + File fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + fsPath.getAbsolutePath()); + lfs.delete(new Path(fsPath.getAbsolutePath()), true); + rollingLevelDB = new MyRollingLevelDB(); + } + + @Test + public void testInsertAfterRollPeriodRollsDB() throws Exception { + + rollingLevelDB.init(conf); + long now = rollingLevelDB.currentTimeMillis(); + DB db = rollingLevelDB.getDBForStartTime(now); + long startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now), + startTime); + now = rollingLevelDB.getNextRollingTimeMillis(); + rollingLevelDB.setCurrentTimeMillis(now); + db = rollingLevelDB.getDBForStartTime(now); + startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now), + startTime); + } + + @Test + public void testInsertForPreviousPeriodAfterRollPeriodRollsDB() + throws Exception { + + rollingLevelDB.init(conf); + long now = rollingLevelDB.currentTimeMillis(); + now = rollingLevelDB.computeCurrentCheckMillis(now); + rollingLevelDB.setCurrentTimeMillis(now); + DB db = rollingLevelDB.getDBForStartTime(now - 1); + long startTime = rollingLevelDB.getStartTimeFor(db); + Assert.assertEquals("Received level db for incorrect start time", + rollingLevelDB.computeCurrentCheckMillis(now - 1), + startTime); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java new file mode 100644 index 0000000..956e9e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TestRollingLevelDBTimelineStore.java @@ -0,0 +1,427 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timeline; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.io.File; +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.records.Version; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.mortbay.log.Log; + +/** Test class to verify RollingLevelDBTimelineStore. */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils { + private FileContext fsContext; + private File fsPath; + private Configuration config = new YarnConfiguration(); + + @Before + public void setup() throws Exception { + fsContext = FileContext.getLocalFSFileContext(); + fsPath = new File("target", this.getClass().getSimpleName() + + "-tmpDir").getAbsoluteFile(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, + fsPath.getAbsolutePath()); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); + store = new RollingLevelDBTimelineStore(); + store.init(config); + store.start(); + loadTestEntityData(); + loadVerificationEntityData(); + loadTestDomainData(); + } + + @After + public void tearDown() throws Exception { + store.stop(); + fsContext.delete(new Path(fsPath.getAbsolutePath()), true); + } + + @Test + public void testRootDirPermission() throws IOException { + FileSystem fs = FileSystem.getLocal(new YarnConfiguration()); + FileStatus file = fs.getFileStatus(new Path(fsPath.getAbsolutePath(), + RollingLevelDBTimelineStore.FILENAME)); + assertNotNull(file); + assertEquals(RollingLevelDBTimelineStore.LEVELDB_DIR_UMASK, + file.getPermission()); + } + + @Test + public void testGetSingleEntity() throws IOException { + super.testGetSingleEntity(); + ((RollingLevelDBTimelineStore)store).clearStartTimeCache(); + super.testGetSingleEntity(); + loadTestEntityData(); + } + + @Test + public void testGetEntities() throws IOException { + super.testGetEntities(); + } + + @Test + public void testGetEntitiesWithFromId() throws IOException { + super.testGetEntitiesWithFromId(); + } + + @Test + public void testGetEntitiesWithFromTs() throws IOException { + // feature not supported + } + + @Test + public void testGetEntitiesWithPrimaryFilters() throws IOException { + super.testGetEntitiesWithPrimaryFilters(); + } + + @Test + public void testGetEntitiesWithSecondaryFilters() throws IOException { + super.testGetEntitiesWithSecondaryFilters(); + } + + @Test + public void testGetEvents() throws IOException { + super.testGetEvents(); + } + + @Test + public void testCacheSizes() { + Configuration conf = new Configuration(); + assertEquals(10000, + RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf)); + assertEquals(10000, + RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf)); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + 10001); + assertEquals(10001, + RollingLevelDBTimelineStore.getStartTimeReadCacheSize(conf)); + conf = new Configuration(); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + 10002); + assertEquals(10002, + RollingLevelDBTimelineStore.getStartTimeWriteCacheSize(conf)); + } + + @Test + public void testCheckVersion() throws IOException { + RollingLevelDBTimelineStore dbStore = (RollingLevelDBTimelineStore) store; + // default version + Version defaultVersion = dbStore.getCurrentVersion(); + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // compatible version + Version compatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion(), + defaultVersion.getMinorVersion() + 2); + dbStore.storeVersion(compatibleVersion); + Assert.assertEquals(compatibleVersion, dbStore.loadVersion()); + restartTimelineStore(); + dbStore = (RollingLevelDBTimelineStore) store; + // overwrite the compatible version + Assert.assertEquals(defaultVersion, dbStore.loadVersion()); + + // incompatible version + Version incompatibleVersion = + Version.newInstance(defaultVersion.getMajorVersion() + 1, + defaultVersion.getMinorVersion()); + dbStore.storeVersion(incompatibleVersion); + try { + restartTimelineStore(); + Assert.fail("Incompatible version, should expect fail here."); + } catch (ServiceStateException e) { + Assert.assertTrue("Exception message mismatch", + e.getMessage().contains("Incompatible version for timeline store")); + } + } + + @Test + public void testValidateConfig() throws IOException { + Configuration copyConfig = new YarnConfiguration(config); + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_TTL_MS)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration + .TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE)); + } + try { + Configuration newConfig = new YarnConfiguration(copyConfig); + newConfig.setLong( + YarnConfiguration + .TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + 0); + config = newConfig; + restartTimelineStore(); + Assert.fail(); + } catch (IllegalArgumentException e) { + Assert.assertTrue(e.getMessage().contains( + YarnConfiguration + .TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE)); + } + config = copyConfig; + restartTimelineStore(); + } + + private void restartTimelineStore() throws IOException { + // need to close so leveldb releases database lock + if (store != null) { + store.close(); + } + store = new RollingLevelDBTimelineStore(); + store.init(config); + store.start(); + } + + @Test + public void testGetDomain() throws IOException { + super.testGetDomain(); + } + + @Test + public void testGetDomains() throws IOException { + super.testGetDomains(); + } + + @Test + public void testRelatingToNonExistingEntity() throws IOException { + TimelineEntity entityToStore = new TimelineEntity(); + entityToStore.setEntityType("TEST_ENTITY_TYPE_1"); + entityToStore.setEntityId("TEST_ENTITY_ID_1"); + entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); + entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2"); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + store.put(entities); + TimelineEntity entityToGet = + store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + Assert.assertEquals("TEST_ENTITY_TYPE_1", + entityToGet.getRelatedEntities().keySet().iterator().next()); + Assert.assertEquals("TEST_ENTITY_ID_1", + entityToGet.getRelatedEntities().values().iterator().next() + .iterator().next()); + } + + @Test + public void testRelatingToEntityInSamePut() throws IOException { + TimelineEntity entityToRelate = new TimelineEntity(); + entityToRelate.setEntityType("TEST_ENTITY_TYPE_2"); + entityToRelate.setEntityId("TEST_ENTITY_ID_2"); + entityToRelate.setDomainId("TEST_DOMAIN"); + TimelineEntity entityToStore = new TimelineEntity(); + entityToStore.setEntityType("TEST_ENTITY_TYPE_1"); + entityToStore.setEntityId("TEST_ENTITY_ID_1"); + entityToStore.setDomainId("TEST_DOMAIN"); + entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2"); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + entities.addEntity(entityToRelate); + store.put(entities); + TimelineEntity entityToGet = + store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("TEST_DOMAIN", entityToGet.getDomainId()); + Assert.assertEquals("TEST_ENTITY_TYPE_1", + entityToGet.getRelatedEntities().keySet().iterator().next()); + Assert.assertEquals("TEST_ENTITY_ID_1", + entityToGet.getRelatedEntities().values().iterator().next() + .iterator().next()); + } + + @Test + public void testRelatingToOldEntityWithoutDomainId() throws IOException { + // New entity is put in the default domain + TimelineEntity entityToStore = new TimelineEntity(); + entityToStore.setEntityType("NEW_ENTITY_TYPE_1"); + entityToStore.setEntityId("NEW_ENTITY_ID_1"); + entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); + entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1"); + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + store.put(entities); + + TimelineEntity entityToGet = + store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + Assert.assertEquals("NEW_ENTITY_TYPE_1", + entityToGet.getRelatedEntities().keySet().iterator().next()); + Assert.assertEquals("NEW_ENTITY_ID_1", + entityToGet.getRelatedEntities().values().iterator().next() + .iterator().next()); + + // New entity is not put in the default domain + entityToStore = new TimelineEntity(); + entityToStore.setEntityType("NEW_ENTITY_TYPE_2"); + entityToStore.setEntityId("NEW_ENTITY_ID_2"); + entityToStore.setDomainId("NON_DEFAULT"); + entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1"); + entities = new TimelineEntities(); + entities.addEntity(entityToStore); + TimelinePutResponse response = store.put(entities); + Assert.assertEquals(1, response.getErrors().size()); + Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION, + response.getErrors().get(0).getErrorCode()); + entityToGet = + store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null); + Assert.assertNotNull(entityToGet); + Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); + // Still have one related entity + Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size()); + Assert.assertEquals(1, entityToGet.getRelatedEntities().values() + .iterator().next().size()); + } + + public void testStorePerformance() throws IOException { + TimelineEntity entityToStorePrep = new TimelineEntity(); + entityToStorePrep.setEntityType("TEST_ENTITY_TYPE_PREP"); + entityToStorePrep.setEntityId("TEST_ENTITY_ID_PREP"); + entityToStorePrep.setDomainId("TEST_DOMAIN"); + entityToStorePrep.addRelatedEntity("TEST_ENTITY_TYPE_2", + "TEST_ENTITY_ID_2"); + entityToStorePrep.setStartTime(0L); + + TimelineEntities entitiesPrep = new TimelineEntities(); + entitiesPrep.addEntity(entityToStorePrep); + store.put(entitiesPrep); + + long start = System.currentTimeMillis(); + int num = 1000000; + + Log.info("Start test for " + num); + + final String tezTaskAttemptId = "TEZ_TA"; + final String tezEntityId = "attempt_1429158534256_0001_1_00_000000_"; + final String tezTaskId = "TEZ_T"; + final String tezDomainId = "Tez_ATS_application_1429158534256_0001"; + + TimelineEntity entityToStore = new TimelineEntity(); + TimelineEvent startEvt = new TimelineEvent(); + entityToStore.setEntityType(tezTaskAttemptId); + + startEvt.setEventType("TASK_ATTEMPT_STARTED"); + startEvt.setTimestamp(0); + entityToStore.addEvent(startEvt); + entityToStore.setDomainId(tezDomainId); + + entityToStore.addPrimaryFilter("status", "SUCCEEDED"); + entityToStore.addPrimaryFilter("applicationId", + "application_1429158534256_0001"); + entityToStore.addPrimaryFilter("TEZ_VERTEX_ID", + "vertex_1429158534256_0001_1_00"); + entityToStore.addPrimaryFilter("TEZ_DAG_ID", "dag_1429158534256_0001_1"); + entityToStore.addPrimaryFilter("TEZ_TASK_ID", + "task_1429158534256_0001_1_00_000000"); + + entityToStore.setStartTime(0L); + entityToStore.addOtherInfo("startTime", 0); + entityToStore.addOtherInfo("inProgressLogsURL", + "localhost:8042/inProgressLogsURL"); + entityToStore.addOtherInfo("completedLogsURL", ""); + entityToStore.addOtherInfo("nodeId", "localhost:54450"); + entityToStore.addOtherInfo("nodeHttpAddress", "localhost:8042"); + entityToStore.addOtherInfo("containerId", + "container_1429158534256_0001_01_000002"); + entityToStore.addOtherInfo("status", "RUNNING"); + entityToStore.addRelatedEntity(tezTaskId, "TEZ_TASK_ID_1"); + + TimelineEntities entities = new TimelineEntities(); + entities.addEntity(entityToStore); + + for (int i = 0; i < num; ++i) { + entityToStore.setEntityId(tezEntityId + i); + store.put(entities); + } + + long duration = System.currentTimeMillis() - start; + Log.info("Duration for " + num + ": " + duration); + } + + public static void main(String[] args) throws Exception { + TestRollingLevelDBTimelineStore store = + new TestRollingLevelDBTimelineStore(); + store.setup(); + store.testStorePerformance(); + store.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/daf3e4ef/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java index 6ac5a35..71e298c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/timeline/TimelineStoreTestUtils.java @@ -70,7 +70,7 @@ public class TimelineStoreTestUtils { protected String entityId6; protected String entityId7; protected String entityType7; - + protected Map> primaryFilters; protected Map secondaryFilters; protected Map allFilters; @@ -105,7 +105,7 @@ public class TimelineStoreTestUtils { Set l1 = new HashSet(); l1.add("username"); Set l2 = new HashSet(); - l2.add((long)Integer.MAX_VALUE); + l2.add(Integer.MAX_VALUE); Set l3 = new HashSet(); l3.add("123abc"); Set l4 = new HashSet(); @@ -115,7 +115,7 @@ public class TimelineStoreTestUtils { primaryFilters.put("other", l3); primaryFilters.put("long", l4); Map secondaryFilters = new HashMap(); - secondaryFilters.put("startTime", 123456l); + secondaryFilters.put("startTime", 123456); secondaryFilters.put("status", "RUNNING"); Map otherInfo1 = new HashMap(); otherInfo1.put("info1", "val1"); @@ -139,7 +139,7 @@ public class TimelineStoreTestUtils { relatedEntities.put(entityType2, Collections.singleton(entityId2)); TimelineEvent ev3 = createEvent(789l, "launch_event", null); - TimelineEvent ev4 = createEvent(-123l, "init_event", null); + TimelineEvent ev4 = createEvent(0l, "init_event", null); List events = new ArrayList(); events.add(ev3); events.add(ev4); @@ -302,7 +302,7 @@ public class TimelineStoreTestUtils { relEntityMap2.put(entityType4, Collections.singleton(entityId4)); ev3 = createEvent(789l, "launch_event", null); - ev4 = createEvent(-123l, "init_event", null); + ev4 = createEvent(0l, "init_event", null); events2 = new ArrayList(); events2.add(ev3); events2.add(ev4); @@ -384,7 +384,7 @@ public class TimelineStoreTestUtils { entityType1, EnumSet.allOf(Field.class)), domainId1); verifyEntityInfo(entityId2, entityType2, events2, relEntityMap, - EMPTY_PRIMARY_FILTERS, EMPTY_MAP, -123l, store.getEntity(entityId2, + EMPTY_PRIMARY_FILTERS, EMPTY_MAP, 0l, store.getEntity(entityId2, entityType2, EnumSet.allOf(Field.class)), domainId1); verifyEntityInfo(entityId4, entityType4, EMPTY_EVENTS, EMPTY_REL_ENTITIES,