Return-Path: X-Original-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-yarn-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 9218410AFC for ; Tue, 4 Mar 2014 17:34:49 +0000 (UTC) Received: (qmail 99841 invoked by uid 500); 4 Mar 2014 17:34:47 -0000 Delivered-To: apmail-hadoop-yarn-commits-archive@hadoop.apache.org Received: (qmail 99759 invoked by uid 500); 4 Mar 2014 17:34:46 -0000 Mailing-List: contact yarn-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: yarn-commits@hadoop.apache.org Delivered-To: mailing list yarn-commits@hadoop.apache.org Received: (qmail 99750 invoked by uid 99); 4 Mar 2014 17:34:46 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2014 17:34:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 04 Mar 2014 17:34:41 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1061923888E2; Tue, 4 Mar 2014 17:34:20 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1574147 - in /hadoop/common/branches/branch-2.4/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/ap... Date: Tue, 04 Mar 2014 17:34:19 -0000 To: yarn-commits@hadoop.apache.org From: vinodkv@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140304173420.1061923888E2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: vinodkv Date: Tue Mar 4 17:34:19 2014 New Revision: 1574147 URL: http://svn.apache.org/r1574147 Log: YARN-1730. Implemented simple write-locking in the LevelDB based timeline-store. Contributed by Billie Rinaldi. svn merge --ignore-ancestry -c 1574145 ../../trunk/ Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt?rev=1574147&r1=1574146&r2=1574147&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/CHANGES.txt Tue Mar 4 17:34:19 2014 @@ -207,6 +207,9 @@ Release 2.4.0 - UNRELEASED YARN-1765. Added test cases to verify that killApplication API works across ResourceManager failover. (Xuan Gong via vinodkv) + YARN-1730. Implemented simple write-locking in the LevelDB based timeline- + store. (Billie Rinaldi via vinodkv) + OPTIMIZATIONS BUG FIXES Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1574147&r1=1574146&r2=1574147&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Mar 4 17:34:19 2014 @@ -1073,9 +1073,22 @@ public class YarnConfiguration extends C public static final String TIMELINE_SERVICE_STORE = TIMELINE_SERVICE_PREFIX + "store-class"; + public static final String TIMELINE_SERVICE_LEVELDB_PREFIX = + TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store."; + /** Timeline service leveldb path */ public static final String TIMELINE_SERVICE_LEVELDB_PATH = - TIMELINE_SERVICE_PREFIX + "leveldb-timeline-store.path"; + TIMELINE_SERVICE_LEVELDB_PREFIX + "path"; + + /** Timeline service leveldb start time read cache (number of entities) */ + public static final String + TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE = + TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-read-cache-size"; + + /** Timeline service leveldb start time write cache (number of entities) */ + public static final String + TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE = + TIMELINE_SERVICE_LEVELDB_PREFIX + "start-time-write-cache-size"; //////////////////////////////// // Other Configs Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java?rev=1574147&r1=1574146&r2=1574147&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/LeveldbTimelineStore.java Tue Mar 4 17:34:19 2014 @@ -33,6 +33,7 @@ import java.util.Map.Entry; import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; +import java.util.concurrent.locks.ReentrantLock; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections.map.LRUMap; @@ -84,11 +85,17 @@ public class LeveldbTimelineStore extend private static final byte[] EMPTY_BYTES = new byte[0]; - private static final int START_TIME_CACHE_SIZE = 10000; + private static final int DEFAULT_START_TIME_READ_CACHE_SIZE = 10000; + private static final int DEFAULT_START_TIME_WRITE_CACHE_SIZE = 10000; - @SuppressWarnings("unchecked") - private final Map startTimeCache = - Collections.synchronizedMap(new LRUMap(START_TIME_CACHE_SIZE)); + private Map startTimeWriteCache; + private Map startTimeReadCache; + + /** + * Per-entity locks are obtained when writing. + */ + private final LockMap writeLocks = + new LockMap(); private DB db; @@ -97,6 +104,7 @@ public class LeveldbTimelineStore extend } @Override + @SuppressWarnings("unchecked") protected void serviceInit(Configuration conf) throws Exception { Options options = new Options(); options.createIfMissing(true); @@ -109,6 +117,12 @@ public class LeveldbTimelineStore extend "timeline store " + path); LOG.info("Using leveldb path " + path); db = factory.open(new File(path, FILENAME), options); + startTimeWriteCache = + Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( + conf))); + startTimeReadCache = + Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize( + conf))); super.serviceInit(conf); } @@ -118,6 +132,45 @@ public class LeveldbTimelineStore extend super.serviceStop(); } + private static class LockMap { + private static class CountingReentrantLock extends ReentrantLock { + private int count; + private K key; + + CountingReentrantLock(K key) { + super(); + this.count = 0; + this.key = key; + } + } + + private Map> locks = + new HashMap>(); + + synchronized CountingReentrantLock getLock(K key) { + CountingReentrantLock lock = locks.get(key); + if (lock == null) { + lock = new CountingReentrantLock(key); + locks.put(key, lock); + } + + lock.count++; + return lock; + } + + synchronized void returnLock(CountingReentrantLock lock) { + if (lock.count == 0) { + throw new IllegalStateException("Returned lock more times than it " + + "was retrieved"); + } + lock.count--; + + if (lock.count == 0) { + locks.remove(lock.key); + } + } + } + private static class KeyBuilder { private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10; private byte[][] b; @@ -214,7 +267,7 @@ public class LeveldbTimelineStore extend EnumSet fields) throws IOException { DBIterator iterator = null; try { - byte[] revStartTime = getStartTime(entityId, entityType, null, null, null); + byte[] revStartTime = getStartTime(entityId, entityType); if (revStartTime == null) return null; byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) @@ -338,7 +391,7 @@ public class LeveldbTimelineStore extend // look up start times for the specified entities // skip entities with no start time for (String entity : entityIds) { - byte[] startTime = getStartTime(entity, entityType, null, null, null); + byte[] startTime = getStartTime(entity, entityType); if (startTime != null) { List entities = startTimeMap.get(startTime); if (entities == null) { @@ -529,12 +582,16 @@ public class LeveldbTimelineStore extend * response. */ private void put(TimelineEntity entity, TimelinePutResponse response) { + LockMap.CountingReentrantLock lock = + writeLocks.getLock(new EntityIdentifier(entity.getEntityId(), + entity.getEntityType())); + lock.lock(); WriteBatch writeBatch = null; try { writeBatch = db.createWriteBatch(); List events = entity.getEvents(); // look up the start time for the entity - byte[] revStartTime = getStartTime(entity.getEntityId(), + byte[] revStartTime = getAndSetStartTime(entity.getEntityId(), entity.getEntityType(), entity.getStartTime(), events, writeBatch); if (revStartTime == null) { @@ -571,7 +628,7 @@ public class LeveldbTimelineStore extend String relatedEntityType = relatedEntityList.getKey(); for (String relatedEntityId : relatedEntityList.getValue()) { // look up start time of related entity - byte[] relatedEntityStartTime = getStartTime(relatedEntityId, + byte[] relatedEntityStartTime = getAndSetStartTime(relatedEntityId, relatedEntityType, null, null, writeBatch); if (relatedEntityStartTime == null) { // if start time is not found, set start time of the related @@ -580,7 +637,7 @@ public class LeveldbTimelineStore extend relatedEntityStartTime = revStartTime; writeBatch.put(createStartTimeLookupKey(relatedEntityId, relatedEntityType), relatedEntityStartTime); - startTimeCache.put(new EntityIdentifier(relatedEntityId, + startTimeWriteCache.put(new EntityIdentifier(relatedEntityId, relatedEntityType), revStartTimeLong); } // write reverse entry (related entity -> entity) @@ -629,6 +686,8 @@ public class LeveldbTimelineStore extend error.setErrorCode(TimelinePutError.IO_EXCEPTION); response.addError(error); } finally { + lock.unlock(); + writeLocks.returnLock(lock); IOUtils.cleanup(LOG, writeBatch); } } @@ -666,6 +725,39 @@ public class LeveldbTimelineStore extend * * @param entityId The id of the entity * @param entityType The type of the entity + * @return A byte array + * @throws IOException + */ + private byte[] getStartTime(String entityId, String entityType) + throws IOException { + EntityIdentifier entity = new EntityIdentifier(entityId, entityType); + // start time is not provided, so try to look it up + if (startTimeReadCache.containsKey(entity)) { + // found the start time in the cache + return writeReverseOrderedLong(startTimeReadCache.get(entity)); + } else { + // try to look up the start time in the db + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + byte[] v = db.get(b); + if (v == null) { + // did not find the start time in the db + return null; + } else { + // found the start time in the db + startTimeReadCache.put(entity, readReverseOrderedLong(v, 0)); + return v; + } + } + } + + /** + * Get the unique start time for a given entity as a byte array that sorts + * the timestamps in reverse order (see {@link + * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time + * doesn't exist, set it based on the information provided. + * + * @param entityId The id of the entity + * @param entityType The type of the entity * @param startTime The start time of the entity, or null * @param events A list of events for the entity, or null * @param writeBatch A leveldb write batch, if the method is called by a @@ -673,62 +765,76 @@ public class LeveldbTimelineStore extend * @return A byte array * @throws IOException */ - private byte[] getStartTime(String entityId, String entityType, + private byte[] getAndSetStartTime(String entityId, String entityType, Long startTime, List events, WriteBatch writeBatch) throws IOException { EntityIdentifier entity = new EntityIdentifier(entityId, entityType); if (startTime == null) { // start time is not provided, so try to look it up - if (startTimeCache.containsKey(entity)) { + if (startTimeWriteCache.containsKey(entity)) { // found the start time in the cache - startTime = startTimeCache.get(entity); + startTime = startTimeWriteCache.get(entity); + return writeReverseOrderedLong(startTime); } else { - // try to look up the start time in the db - byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); - byte[] v = db.get(b); - if (v == null) { - // did not find the start time in the db - // if this is a put, try to set it from the provided events - if (events == null || writeBatch == null) { - // no events, or not a put, so return null - return null; - } + if (events != null) { + // prepare a start time from events in case it is needed Long min = Long.MAX_VALUE; - for (TimelineEvent e : events) - if (min > e.getTimestamp()) + for (TimelineEvent e : events) { + if (min > e.getTimestamp()) { min = e.getTimestamp(); - startTime = min; - // selected start time as minimum timestamp of provided events - // write start time to db and cache - writeBatch.put(b, writeReverseOrderedLong(startTime)); - startTimeCache.put(entity, startTime); - } else { - // found the start time in the db - startTime = readReverseOrderedLong(v, 0); - if (writeBatch != null) { - // if this is a put, re-add the start time to the cache - startTimeCache.put(entity, startTime); + } } + startTime = min; } + return checkStartTimeInDb(entity, startTime, writeBatch); } } else { // start time is provided - // TODO: verify start time in db as well as cache? - if (startTimeCache.containsKey(entity)) { - // if the start time is already in the cache, - // and it is different from the provided start time, - // use the one from the cache - if (!startTime.equals(startTimeCache.get(entity))) - startTime = startTimeCache.get(entity); - } else if (writeBatch != null) { - // if this is a put, write the provided start time to the db and the - // cache - byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); - writeBatch.put(b, writeReverseOrderedLong(startTime)); - startTimeCache.put(entity, startTime); + if (startTimeWriteCache.containsKey(entity)) { + // check the provided start time matches the cache + if (!startTime.equals(startTimeWriteCache.get(entity))) { + // the start time is already in the cache, + // and it is different from the provided start time, + // so use the one from the cache + startTime = startTimeWriteCache.get(entity); + } + return writeReverseOrderedLong(startTime); + } else { + // check the provided start time matches the db + return checkStartTimeInDb(entity, startTime, writeBatch); } } - return writeReverseOrderedLong(startTime); + } + + /** + * Checks db for start time and returns it if it exists. If it doesn't + * exist, writes the suggested start time (if it is not null). This is + * only called when the start time is not found in the cache, + * so it adds it back into the cache if it is found. + */ + private byte[] checkStartTimeInDb(EntityIdentifier entity, + Long suggestedStartTime, WriteBatch writeBatch) throws IOException { + // create lookup key for start time + byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); + // retrieve value for key + byte[] v = db.get(b); + byte[] revStartTime; + if (v == null) { + // start time doesn't exist in db + if (suggestedStartTime == null) { + return null; + } + // write suggested start time + revStartTime = writeReverseOrderedLong(suggestedStartTime); + writeBatch.put(b, revStartTime); + } else { + // found start time in db, so ignore suggested start time + suggestedStartTime = readReverseOrderedLong(v, 0); + revStartTime = v; + } + startTimeWriteCache.put(entity, suggestedStartTime); + startTimeReadCache.put(entity, suggestedStartTime); + return revStartTime; } /** @@ -868,6 +974,21 @@ public class LeveldbTimelineStore extend */ @VisibleForTesting void clearStartTimeCache() { - startTimeCache.clear(); + startTimeWriteCache.clear(); + startTimeReadCache.clear(); + } + + @VisibleForTesting + static int getStartTimeReadCacheSize(Configuration conf) { + return conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + DEFAULT_START_TIME_READ_CACHE_SIZE); + } + + @VisibleForTesting + static int getStartTimeWriteCacheSize(Configuration conf) { + return conf.getInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + DEFAULT_START_TIME_WRITE_CACHE_SIZE); } } Modified: hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java?rev=1574147&r1=1574146&r2=1574147&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/timeline/TestLeveldbTimelineStore.java Tue Mar 4 17:34:19 2014 @@ -30,6 +30,8 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; +import static org.junit.Assert.assertEquals; + @InterfaceAudience.Private @InterfaceStability.Unstable public class TestLeveldbTimelineStore @@ -64,6 +66,7 @@ public class TestLeveldbTimelineStore super.testGetSingleEntity(); ((LeveldbTimelineStore)store).clearStartTimeCache(); super.testGetSingleEntity(); + loadTestData(); } @Test @@ -86,4 +89,20 @@ public class TestLeveldbTimelineStore super.testGetEvents(); } + @Test + public void testCacheSizes() { + Configuration conf = new Configuration(); + assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf)); + assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf)); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, + 10001); + assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf)); + conf = new Configuration(); + conf.setInt( + YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, + 10002); + assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf)); + } + }