hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From l..@apache.org
Subject [18/21] hadoop git commit: add getFileStatus and listStatus()
Date Fri, 08 Jul 2016 21:30:25 GMT
add getFileStatus and listStatus()


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8fb665dc
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8fb665dc
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8fb665dc

Branch: refs/heads/s3_create
Commit: 8fb665dc5a78e237d52b927d7043df59387b3f46
Parents: a281799
Author: Lei Xu <lei@cloudera.com>
Authored: Thu Jul 7 21:50:40 2016 -0700
Committer: Lei Xu <lei@cloudera.com>
Committed: Thu Jul 7 21:50:40 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/fs/s3a/CachedDirectory.java   | 49 +++++++++++-------
 .../apache/hadoop/fs/s3a/CachedFileStatus.java  | 14 +++++-
 .../hadoop/fs/s3a/DynamoDBMetadataStore.java    |  2 +-
 .../org/apache/hadoop/fs/s3a/S3AFileStatus.java |  6 +++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 53 ++++++++++++++++++--
 .../hadoop/fs/s3a/TestLocalMetadataStore.java   | 17 +++----
 6 files changed, 106 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java
index 5f0f606..0ea9436 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedDirectory.java
@@ -1,11 +1,14 @@
 package org.apache.hadoop.fs.s3a;
 
+import com.google.common.collect.ImmutableCollection;
+import com.google.common.collect.ImmutableMap;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
-import java.util.ArrayList;
-import java.util.Arrays;
+import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Result of a listStatus() call for placing into a coherent metadata cache.
@@ -17,12 +20,12 @@ import java.util.Collection;
  */
 public class CachedDirectory {
 
-  public static final FileStatus[] EMPTY_DIR = {};
+  public static final S3AFileStatus[] EMPTY_DIR = {};
   protected Path path;
 
   /** TODO optimize out initial copy: use wrapped Arrays.asList() initially
    *  and change to ArrayList only on add/delete. */
-  protected ArrayList<FileStatus> fileStatuses;
+  protected Map<Path, CachedFileStatus> statusMap = new HashMap<>();
 
   /**
    * True iff this CachedDirectory contained the same set of files as actually
@@ -35,9 +38,12 @@ public class CachedDirectory {
    */
   protected boolean isFullyCached;
 
-  public CachedDirectory(Path path, FileStatus[] fileStatuses, boolean isFullyCached) {
+  public CachedDirectory(Path path, S3AFileStatus[] fileStatuses, boolean isFullyCached)
{
     this.path = path;
-    this.fileStatuses = new ArrayList<>(Arrays.asList(fileStatuses));
+    for (S3AFileStatus s : fileStatuses) {
+      // TODO generate a lot of garbage
+      statusMap.put(s.getPath(), new CachedFileStatus(s));
+    }
     this.isFullyCached = isFullyCached;
   }
 
@@ -45,18 +51,21 @@ public class CachedDirectory {
     return path;
   }
 
-  public FileStatus[] getFileStatuses() {
-    FileStatus[] statuses = new FileStatus[fileStatuses.size()];
-    return fileStatuses.toArray(statuses);
+  public Collection<CachedFileStatus> getFileStatuses() {
+    return statusMap.values();
   }
 
   public boolean isFullyCached() {
     return isFullyCached;
   }
 
+  public void setFullyCached() {
+    isFullyCached = true;
+  }
+
   /** Add given file to this directory.  Does not check for duplicates. */
-  public void addFile(FileStatus status)  {
-    fileStatuses.add(status);
+  public void addFile(S3AFileStatus status)  {
+    statusMap.put(status.getPath(), new CachedFileStatus(status));
   }
 
   /**
@@ -64,21 +73,25 @@ public class CachedDirectory {
    * @return true iff path was found and removed.
    */
   public boolean removeFile(Path path) {
-    for (int i = 0; i < fileStatuses.size(); i++) {
-      FileStatus s = fileStatuses.get(i);
-      if (s.getPath().equals(path)) {
-        fileStatuses.remove(i);
-        return true;
-      }
+    if (statusMap.containsKey(path.toString())) {
+      statusMap.get(path.toString()).delete();
     }
     return false;
   }
 
+  public boolean has(Path path) {
+    return statusMap.containsKey(path);
+  }
+
+  public void flush() throws IOException {
+    // TODO: Save to database.
+  }
+
   @Override
   public String toString() {
     return "CachedDirectory{" +
         "path=" + path +
-        ", fileStatuses=" + fileStatuses +
+        ", fileStatuses=" + statusMap +
         ", isFullyCached=" + isFullyCached +
         '}';
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java
index 4b0156e..dcc1f44 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/CachedFileStatus.java
@@ -9,19 +9,29 @@ import org.apache.hadoop.fs.FileStatus;
  */
 @InterfaceAudience.LimitedPrivate("HDFS")
 @InterfaceStability.Evolving
-public class CachedFileStatus {
+public class CachedFileStatus extends S3AFileStatus {
 
   // TODO this may change to inheritance, not sure yet.
   protected S3AFileStatus fileStatus;
 
+  boolean isDeleted;
+
   public CachedFileStatus(S3AFileStatus fileStatus) {
-    this.fileStatus = fileStatus;
+    super(fileStatus);
+    isDeleted = false;
   }
 
   public S3AFileStatus getFileStatus() {
     return fileStatus;
   }
 
+  public boolean isDeleted() {
+    return isDeleted;
+  }
+
+  public void delete() {
+    isDeleted = true;
+  }
   @Override
   public String toString() {
     return "CachedFileStatus{" +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java
index efc2fb3..061369d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/DynamoDBMetadataStore.java
@@ -132,7 +132,7 @@ public class DynamoDBMetadataStore implements MetadataStore, Closeable
{
 
       // XXX TODO this object needs to map cleanly to/from FileStatus
       //  hacking broken values for now
-      FileStatus fs = new FileStatus(0, isdir, 0, 0, 0, getPath());
+      S3AFileStatus fs = new S3AFileStatus(0, 0, getPath(), 0);
       CachedFileStatus cfs = new CachedFileStatus(fs);
       return cfs;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
index 75a6500..b633f84 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
@@ -33,6 +33,12 @@ import org.apache.hadoop.fs.Path;
 public class S3AFileStatus extends FileStatus {
   private boolean isEmptyDirectory;
 
+  S3AFileStatus(S3AFileStatus other) {
+    super(other.getLen(), other.isDirectory(), 1, other.getBlockSize(),
+        other.getModificationTime(), other.getPath());
+    isEmptyDirectory = other.isEmptyDirectory();
+  }
+
   // Directories
   public S3AFileStatus(boolean isdir, boolean isemptydir, Path path) {
     super(0, isdir, 1, 0, 0, path);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 3299f2b..55540e7 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -25,7 +25,10 @@ import java.io.InputStream;
 import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
@@ -61,6 +64,7 @@ import com.amazonaws.event.ProgressEvent;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -1155,7 +1159,46 @@ public class S3AFileSystem extends FileSystem {
   public FileStatus[] listStatus(Path f) throws FileNotFoundException,
       IOException {
     try {
-      return innerListStatus(f);
+      CachedDirectory cachedDir = null;
+      if (metadataStore != null) {
+        cachedDir = metadataStore.listStatus(f);
+        if (cachedDir != null && cachedDir.isFullyCached()) {
+          return cachedDir.getFileStatuses().toArray(new FileStatus[0]);
+        }
+      }
+
+      S3AFileStatus[] statuses = innerListStatus(f);
+      if (cachedDir != null) {
+        // Merge the view from the external metadata store.
+        Map<Path, S3AFileStatus> baseView = new HashMap<>();
+        for (S3AFileStatus s : statuses) {
+          baseView.put(s.getPath(), s);
+        }
+        Collection<CachedFileStatus> consistentView =
+            cachedDir.getFileStatuses();
+        for (CachedFileStatus cfs : consistentView) {
+          if (cfs.isDeleted) {
+            if (baseView.containsKey(cfs.getPath())) {
+              baseView.remove(cfs.getPath());
+            }
+          } else {
+            baseView.put(cfs.getPath(), cfs.getFileStatus());
+          }
+        }
+        // Add missing files back to the cache so that we can set cachedDir as
+        // fully cached.
+        for (S3AFileStatus status : baseView.values()) {
+          if (!cachedDir.has(status.getPath())) {
+            cachedDir.addFile(status);
+          }
+        }
+        cachedDir.setFullyCached();
+
+        cachedDir.flush();  // TODO async save
+        statuses = baseView.values().toArray(new S3AFileStatus[0]);
+      }
+      return statuses;
+
     } catch (AmazonClientException e) {
       throw translateException("listStatus", f, e);
     }
@@ -1171,14 +1214,14 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException due to an IO problem.
    * @throws AmazonClientException on failures inside the AWS SDK
    */
-  public FileStatus[] innerListStatus(Path f) throws FileNotFoundException,
+  public S3AFileStatus[] innerListStatus(Path f) throws FileNotFoundException,
       IOException, AmazonClientException {
     String key = pathToKey(f);
     LOG.debug("List status for path: {}", f);
     incrementStatistic(INVOCATION_LIST_STATUS);
 
-    final List<FileStatus> result = new ArrayList<FileStatus>();
-    final FileStatus fileStatus =  getFileStatus(f);
+    final List<S3AFileStatus> result = new ArrayList<>();
+    final S3AFileStatus fileStatus =  getFileStatus(f);
 
     if (fileStatus.isDirectory()) {
       if (!key.isEmpty()) {
@@ -1232,7 +1275,7 @@ public class S3AFileSystem extends FileSystem {
       result.add(fileStatus);
     }
 
-    return result.toArray(new FileStatus[result.size()]);
+    return result.toArray(new S3AFileStatus[result.size()]);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8fb665dc/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java
index 4af60ea..6e3e9a6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestLocalMetadataStore.java
@@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -199,7 +200,7 @@ public class TestLocalMetadataStore extends Assert {
     lms.putNew(new CachedFileStatus(makeFileStatus("/a1/b1/file2", 100)));
   }
 
-  private void assertFileStatusesEqual(FileStatus[] statuses, String ...pathStrs) {
+  private void assertFileStatusesEqual(Collection<? extends FileStatus> statuses, String
...pathStrs) {
     Set<Path> a = new HashSet<>();
     for (FileStatus fs : statuses) {
       a.add(fs.getPath());
@@ -224,7 +225,7 @@ public class TestLocalMetadataStore extends Assert {
     CachedDirectory dir = lms.listStatus(new Path(pathStr));
     assertNotNull("Directory " + pathStr + " in cache", dir);
     assertEquals("Number of entries in dir " + pathStr,
-        dir.getFileStatuses().length, size);
+        dir.getFileStatuses().size(), size);
   }
 
   private void assertNotCached(String pathStr) throws IOException {
@@ -244,15 +245,13 @@ public class TestLocalMetadataStore extends Assert {
     }
   }
 
-  private FileStatus makeFileStatus(String pathStr, long length) {
-    Path f = new Path(pathStr);
-    return new FileStatus(length, false /* not dir */, REPLICATION,
-        BLOCK_SIZE, modTime, accessTime, fsPermission, OWNER, GROUP, f);
+  private S3AFileStatus makeFileStatus(String path, long length) {
+    Path f = new Path(path);
+    return new S3AFileStatus(length, modTime, f, BLOCK_SIZE);
   }
 
-  private FileStatus makeDirStatus(String pathStr) {
+  private S3AFileStatus makeDirStatus(String pathStr) {
     Path f = new Path(pathStr);
-    return new FileStatus(0, true /* dir */, REPLICATION,
-        0, modTime, accessTime, fsPermission, OWNER, GROUP, f);
+    return new S3AFileStatus(true, true, f);
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message