hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fab...@apache.org
Subject hadoop git commit: HADOOP-13649 s3guard: implement time-based (TTL) expiry for LocalMetadataStore (Gabor Bota)
Date Tue, 08 May 2018 22:30:25 GMT
Repository: hadoop
Updated Branches:
  refs/heads/trunk 1ef0a1db1 -> 69aac696d


HADOOP-13649 s3guard: implement time-based (TTL) expiry for LocalMetadataStore (Gabor Bota)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69aac696
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69aac696
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69aac696

Branch: refs/heads/trunk
Commit: 69aac696d9d4e32a55ba9b6992f41a9ad13424f1
Parents: 1ef0a1d
Author: Aaron Fabbri <fabbri@apache.org>
Authored: Tue May 8 15:29:46 2018 -0700
Committer: Aaron Fabbri <fabbri@apache.org>
Committed: Tue May 8 15:29:54 2018 -0700

----------------------------------------------------------------------
 .../fs/s3a/s3guard/LocalMetadataStore.java      |  93 ++++++++------
 .../hadoop/fs/s3a/s3guard/LruHashMap.java       |  50 --------
 .../fs/s3a/s3guard/TestLocalMetadataStore.java  | 121 +++++++++++++++----
 3 files changed, 153 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aac696/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
index 0061b56..742c41a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
@@ -21,7 +21,10 @@ package org.apache.hadoop.fs.s3a.s3guard;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,6 +40,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 /**
  * This is a local, in-memory, implementation of MetadataStore.
@@ -51,24 +55,35 @@ import java.util.Map;
  * This MetadataStore does not enforce filesystem rules such as disallowing
  * non-recursive removal of non-empty directories.  It is assumed the caller
  * already has to perform these sorts of checks.
+ *
+ * Contains cache internally with time based eviction.
  */
 public class LocalMetadataStore implements MetadataStore {
 
   public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class);
-  // TODO HADOOP-13649: use time instead of capacity for eviction.
   public static final int DEFAULT_MAX_RECORDS = 128;
+  public static final int DEFAULT_CACHE_ENTRY_TTL_MSEC = 10 * 1000;
 
   /**
    * Maximum number of records.
    */
+  @InterfaceStability.Evolving
   public static final String CONF_MAX_RECORDS =
       "fs.metadatastore.local.max_records";
 
+  /**
+   * Time to live in milliseconds.  If zero, time-based expiration is
+   * disabled.
+   */
+  @InterfaceStability.Evolving
+  public static final String CONF_CACHE_ENTRY_TTL =
+      "fs.metadatastore.local.ttl";
+
   /** Contains directories and files. */
-  private LruHashMap<Path, PathMetadata> fileHash;
+  private Cache<Path, PathMetadata> fileCache;
 
   /** Contains directory listings. */
-  private LruHashMap<Path, DirListingMetadata> dirHash;
+  private Cache<Path, DirListingMetadata> dirCache;
 
   private FileSystem fs;
   /* Null iff this FS does not have an associated URI host. */
@@ -94,9 +109,15 @@ public class LocalMetadataStore implements MetadataStore {
     if (maxRecords < 4) {
       maxRecords = 4;
     }
-    // Start w/ less than max capacity.  Space / time trade off.
-    fileHash = new LruHashMap<>(maxRecords/2, maxRecords);
-    dirHash = new LruHashMap<>(maxRecords/4, maxRecords);
+    int ttl = conf.getInt(CONF_CACHE_ENTRY_TTL, DEFAULT_CACHE_ENTRY_TTL_MSEC);
+
+    CacheBuilder builder = CacheBuilder.newBuilder().maximumSize(maxRecords);
+    if (ttl >= 0) {
+      builder.expireAfterAccess(ttl, TimeUnit.MILLISECONDS);
+    }
+
+    fileCache = builder.build();
+    dirCache = builder.build();
   }
 
   @Override
@@ -130,12 +151,12 @@ public class LocalMetadataStore implements MetadataStore {
 
     // Delete entry from file cache, then from cached parent directory, if any
 
-    deleteHashEntries(path, tombstone);
+    deleteCacheEntries(path, tombstone);
 
     if (recursive) {
       // Remove all entries that have this dir as path prefix.
-      deleteHashByAncestor(path, dirHash, tombstone);
-      deleteHashByAncestor(path, fileHash, tombstone);
+      deleteEntryByAncestor(path, dirCache, tombstone);
+      deleteEntryByAncestor(path, fileCache, tombstone);
     }
   }
 
@@ -149,7 +170,7 @@ public class LocalMetadataStore implements MetadataStore {
       throws IOException {
     Path path = standardize(p);
     synchronized (this) {
-      PathMetadata m = fileHash.mruGet(path);
+      PathMetadata m = fileCache.getIfPresent(path);
 
       if (wantEmptyDirectoryFlag && m != null &&
           m.getFileStatus().isDirectory()) {
@@ -170,7 +191,7 @@ public class LocalMetadataStore implements MetadataStore {
    * @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise.
    */
   private Tristate isEmptyDirectory(Path p) {
-    DirListingMetadata dirMeta = dirHash.get(p);
+    DirListingMetadata dirMeta = dirCache.getIfPresent(p);
     return dirMeta.withoutTombstones().isEmpty();
   }
 
@@ -178,7 +199,7 @@ public class LocalMetadataStore implements MetadataStore {
   public synchronized DirListingMetadata listChildren(Path p) throws
       IOException {
     Path path = standardize(p);
-    DirListingMetadata listing = dirHash.mruGet(path);
+    DirListingMetadata listing = dirCache.getIfPresent(path);
     if (LOG.isDebugEnabled()) {
       LOG.debug("listChildren({}) -> {}", path,
           listing == null ? "null" : listing.prettyPrint());
@@ -237,10 +258,10 @@ public class LocalMetadataStore implements MetadataStore {
       if (LOG.isDebugEnabled()) {
         LOG.debug("put {} -> {}", path, meta.prettyPrint());
       }
-      fileHash.put(path, meta);
+      fileCache.put(path, meta);
 
       /* Directory case:
-       * We also make sure we have an entry in the dirHash, so subsequent
+       * We also make sure we have an entry in the dirCache, so subsequent
        * listStatus(path) at least see the directory.
        *
        * If we had a boolean flag argument "isNew", we would know whether this
@@ -251,9 +272,9 @@ public class LocalMetadataStore implements MetadataStore {
        */
 
       if (status.isDirectory()) {
-        DirListingMetadata dir = dirHash.mruGet(path);
+        DirListingMetadata dir = dirCache.getIfPresent(path);
         if (dir == null) {
-          dirHash.put(path, new DirListingMetadata(path, DirListingMetadata
+          dirCache.put(path, new DirListingMetadata(path, DirListingMetadata
               .EMPTY_DIR, false));
         }
       }
@@ -261,14 +282,14 @@ public class LocalMetadataStore implements MetadataStore {
       /* Update cached parent dir. */
       Path parentPath = path.getParent();
       if (parentPath != null) {
-        DirListingMetadata parent = dirHash.mruGet(parentPath);
+        DirListingMetadata parent = dirCache.getIfPresent(parentPath);
         if (parent == null) {
         /* Track this new file's listing in parent.  Parent is not
          * authoritative, since there may be other items in it we don't know
          * about. */
           parent = new DirListingMetadata(parentPath,
               DirListingMetadata.EMPTY_DIR, false);
-          dirHash.put(parentPath, parent);
+          dirCache.put(parentPath, parent);
         }
         parent.put(status);
       }
@@ -280,7 +301,7 @@ public class LocalMetadataStore implements MetadataStore {
     if (LOG.isDebugEnabled()) {
       LOG.debug("put dirMeta {}", meta.prettyPrint());
     }
-    dirHash.put(standardize(meta.getPath()), meta);
+    dirCache.put(standardize(meta.getPath()), meta);
     put(meta.getListing());
   }
 
@@ -298,8 +319,8 @@ public class LocalMetadataStore implements MetadataStore {
 
   @Override
   public void destroy() throws IOException {
-    if (dirHash != null) {
-      dirHash.clear();
+    if (dirCache != null) {
+      dirCache.invalidateAll();
     }
   }
 
@@ -312,7 +333,7 @@ public class LocalMetadataStore implements MetadataStore {
   public synchronized void prune(long modTime, String keyPrefix)
       throws IOException {
     Iterator<Map.Entry<Path, PathMetadata>> files =
-        fileHash.entrySet().iterator();
+        fileCache.asMap().entrySet().iterator();
     while (files.hasNext()) {
       Map.Entry<Path, PathMetadata> entry = files.next();
       if (expired(entry.getValue().getFileStatus(), modTime, keyPrefix)) {
@@ -320,7 +341,7 @@ public class LocalMetadataStore implements MetadataStore {
       }
     }
     Iterator<Map.Entry<Path, DirListingMetadata>> dirs =
-        dirHash.entrySet().iterator();
+        dirCache.asMap().entrySet().iterator();
     while (dirs.hasNext()) {
       Map.Entry<Path, DirListingMetadata> entry = dirs.next();
       Path path = entry.getKey();
@@ -335,9 +356,10 @@ public class LocalMetadataStore implements MetadataStore {
         }
       }
       if (newChildren.size() != oldChildren.size()) {
-        dirHash.put(path, new DirListingMetadata(path, newChildren, false));
+        dirCache.put(path, new DirListingMetadata(path, newChildren, false));
         if (!path.isRoot()) {
-          DirListingMetadata parent = dirHash.get(path.getParent());
+          DirListingMetadata parent = null;
+          parent = dirCache.getIfPresent(path.getParent());
           if (parent != null) {
             parent.setAuthoritative(false);
           }
@@ -354,9 +376,9 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   @VisibleForTesting
-  static <T> void deleteHashByAncestor(Path ancestor, Map<Path, T> hash,
+  static <T> void deleteEntryByAncestor(Path ancestor, Cache<Path, T> cache,
                                        boolean tombstone) {
-    for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator();
+    for (Iterator<Map.Entry<Path, T>> it = cache.asMap().entrySet().iterator();
          it.hasNext();) {
       Map.Entry<Path, T> entry = it.next();
       Path f = entry.getKey();
@@ -364,11 +386,11 @@ public class LocalMetadataStore implements MetadataStore {
       if (isAncestorOf(ancestor, f)) {
         if (tombstone) {
           if (meta instanceof PathMetadata) {
-            entry.setValue((T) PathMetadata.tombstone(f));
+            cache.put(f, (T) PathMetadata.tombstone(f));
           } else if (meta instanceof DirListingMetadata) {
             it.remove();
           } else {
-            throw new IllegalStateException("Unknown type in hash");
+            throw new IllegalStateException("Unknown type in cache");
           }
         } else {
           it.remove();
@@ -391,17 +413,17 @@ public class LocalMetadataStore implements MetadataStore {
   }
 
   /**
-   * Update fileHash and dirHash to reflect deletion of file 'f'.  Call with
+   * Update fileCache and dirCache to reflect deletion of file 'f'.  Call with
    * lock held.
    */
-  private void deleteHashEntries(Path path, boolean tombstone) {
+  private void deleteCacheEntries(Path path, boolean tombstone) {
 
     // Remove target file/dir
     LOG.debug("delete file entry for {}", path);
     if (tombstone) {
-      fileHash.put(path, PathMetadata.tombstone(path));
+      fileCache.put(path, PathMetadata.tombstone(path));
     } else {
-      fileHash.remove(path);
+      fileCache.invalidate(path);
     }
 
     // Update this and parent dir listing, if any
@@ -409,12 +431,13 @@ public class LocalMetadataStore implements MetadataStore {
     /* If this path is a dir, remove its listing */
     LOG.debug("removing listing of {}", path);
 
-    dirHash.remove(path);
+    dirCache.invalidate(path);
 
     /* Remove this path from parent's dir listing */
     Path parent = path.getParent();
     if (parent != null) {
-      DirListingMetadata dir = dirHash.get(parent);
+      DirListingMetadata dir = null;
+      dir = dirCache.getIfPresent(parent);
       if (dir != null) {
         LOG.debug("removing parent's entry for {} ", path);
         if (tombstone) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aac696/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
deleted file mode 100644
index e355095..0000000
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LruHashMap.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *  contributor license agreements.  See the NOTICE file distributed with
- *  this work for additional information regarding copyright ownership.
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *  (the "License"); you may not use this file except in compliance with
- *  the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.fs.s3a.s3guard;
-
-import java.util.LinkedHashMap;
-import java.util.Map;
-
-/**
- * LinkedHashMap that implements a maximum size and LRU eviction policy.
- */
-public class LruHashMap<K, V> extends LinkedHashMap<K, V> {
-  private final int maxSize;
-  public LruHashMap(int initialCapacity, int maxSize) {
-    super(initialCapacity);
-    this.maxSize = maxSize;
-  }
-
-  @Override
-  protected boolean removeEldestEntry(Map.Entry<K, V> eldest) {
-    return size() > maxSize;
-  }
-
-  /**
-   * get() plus side-effect of making the element Most Recently Used.
-   * @param key lookup key
-   * @return value
-   */
-
-  public V mruGet(K key) {
-    V val = remove(key);
-    if (val != null) {
-      put(key, val);
-    }
-    return val;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/69aac696/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
index 1b765af..074319f 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestLocalMetadataStore.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.fs.s3a.s3guard;
 
 import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
+import com.google.common.base.Ticker;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.junit.Test;
 
 import org.apache.hadoop.conf.Configuration;
@@ -75,38 +77,105 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
 
   @Test
   public void testClearByAncestor() {
-    Map<Path, PathMetadata> map = new HashMap<>();
+    Cache<Path, PathMetadata> cache = CacheBuilder.newBuilder().build();
 
     // 1. Test paths without scheme/host
-    assertClearResult(map, "", "/", 0);
-    assertClearResult(map, "", "/dirA/dirB", 2);
-    assertClearResult(map, "", "/invalid", 5);
+    assertClearResult(cache, "", "/", 0);
+    assertClearResult(cache, "", "/dirA/dirB", 2);
+    assertClearResult(cache, "", "/invalid", 5);
 
 
     // 2. Test paths w/ scheme/host
     String p = "s3a://fake-bucket-name";
-    assertClearResult(map, p, "/", 0);
-    assertClearResult(map, p, "/dirA/dirB", 2);
-    assertClearResult(map, p, "/invalid", 5);
+    assertClearResult(cache, p, "/", 0);
+    assertClearResult(cache, p, "/dirA/dirB", 2);
+    assertClearResult(cache, p, "/invalid", 5);
   }
 
-  private static void populateMap(Map<Path, PathMetadata> map,
+  static class TestTicker extends Ticker {
+    private long myTicker = 0;
+    @Override public long read() {
+      return myTicker;
+    }
+    public void set(long val) {
+      this.myTicker = val;
+    }
+
+  }
+
+  /**
+   * Test that time eviction in cache used in {@link LocalMetadataStore}
+   * implementation working properly.
+   *
+   * The test creates a Ticker instance, which will be used to control the
+   * internal clock of the cache to achieve eviction without having to wait
+   * for the system clock.
+   * The test creates 3 entry: 2nd and 3rd entry will survive the eviction,
+   * because it will be created later than the 1st - using the ticker.
+   */
+  @Test
+  public void testCacheTimedEvictionAfterWrite() {
+    TestTicker testTicker = new TestTicker();
+    final long t0 = testTicker.read();
+    final long t1 = t0 + 100;
+    final long t2 = t1 + 100;
+
+    final long ttl = t1 + 50; // between t1 and t2
+
+    Cache<Path, PathMetadata> cache = CacheBuilder.newBuilder()
+        .expireAfterWrite(ttl,
+            TimeUnit.NANOSECONDS /* nanos to avoid conversions */)
+        .ticker(testTicker)
+        .build();
+
+    String p = "s3a://fake-bucket-name";
+    Path path1 = new Path(p + "/dirA/dirB/file1");
+    Path path2 = new Path(p + "/dirA/dirB/file2");
+    Path path3 = new Path(p + "/dirA/dirB/file3");
+
+    // Test time is t0
+    populateEntry(cache, path1);
+
+    // set new value on the ticker, so the next two entries will be added later
+    testTicker.set(t1);  // Test time is now t1
+    populateEntry(cache, path2);
+    populateEntry(cache, path3);
+
+    assertEquals("Cache should contain 3 records before eviction",
+        3, cache.size());
+    PathMetadata pm1 = cache.getIfPresent(path1);
+    assertNotNull("PathMetadata should not be null before eviction", pm1);
+
+    // set the ticker to a time when timed eviction should occur
+    // for the first entry
+    testTicker.set(t2);
+
+    // call cleanup explicitly, as timed expiration is performed with
+    // periodic maintenance during writes and occasionally during reads only
+    cache.cleanUp();
+
+    assertEquals("Cache size should be 2 after eviction", 2, cache.size());
+    pm1 = cache.getIfPresent(path1);
+    assertNull("PathMetadata should be null after eviction", pm1);
+  }
+
+  private static void populateMap(Cache<Path, PathMetadata> cache,
       String prefix) {
-    populateEntry(map, new Path(prefix + "/dirA/dirB/"));
-    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC"));
-    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file1"));
-    populateEntry(map, new Path(prefix + "/dirA/dirB/dirC/file2"));
-    populateEntry(map, new Path(prefix + "/dirA/file1"));
+    populateEntry(cache, new Path(prefix + "/dirA/dirB/"));
+    populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC"));
+    populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC/file1"));
+    populateEntry(cache, new Path(prefix + "/dirA/dirB/dirC/file2"));
+    populateEntry(cache, new Path(prefix + "/dirA/file1"));
   }
 
-  private static void populateEntry(Map<Path, PathMetadata> map,
+  private static void populateEntry(Cache<Path, PathMetadata> cache,
       Path path) {
-    map.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path)));
+    cache.put(path, new PathMetadata(new FileStatus(0, true, 0, 0, 0, path)));
   }
 
-  private static int sizeOfMap(Map<Path, PathMetadata> map) {
+  private static int sizeOfMap(Cache<Path, PathMetadata> cache) {
     int count = 0;
-    for (PathMetadata meta : map.values()) {
+    for (PathMetadata meta : cache.asMap().values()) {
       if (!meta.isDeleted()) {
         count++;
       }
@@ -114,14 +183,14 @@ public class TestLocalMetadataStore extends MetadataStoreTestBase {
     return count;
   }
 
-  private static void assertClearResult(Map <Path, PathMetadata> map,
+  private static void assertClearResult(Cache<Path, PathMetadata> cache,
       String prefixStr, String pathStr, int leftoverSize) {
-    populateMap(map, prefixStr);
-    LocalMetadataStore.deleteHashByAncestor(new Path(prefixStr + pathStr), map,
-        true);
-    assertEquals(String.format("Map should have %d entries", leftoverSize),
-        leftoverSize, sizeOfMap(map));
-    map.clear();
+    populateMap(cache, prefixStr);
+    LocalMetadataStore.deleteEntryByAncestor(new Path(prefixStr + pathStr),
+        cache, true);
+    assertEquals(String.format("Cache should have %d entries", leftoverSize),
+        leftoverSize, sizeOfMap(cache));
+    cache.invalidateAll();
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message