hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lium...@apache.org
Subject hadoop git commit: HADOOP-13926. S3Guard: S3AFileSystem::listLocatedStatus() to employ MetadataStore. Contributed by Mingliang Liu, Rajesh Balamohan and Aaron Fabbri
Date Wed, 05 Apr 2017 02:49:52 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HADOOP-13345 48bda91e4 -> 72bc8767e


HADOOP-13926. S3Guard: S3AFileSystem::listLocatedStatus() to employ MetadataStore. Contributed
by Mingliang Liu, Rajesh Balamohan and Aaron Fabbri


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/72bc8767
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/72bc8767
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/72bc8767

Branch: refs/heads/HADOOP-13345
Commit: 72bc8767e289362bf2283731667eb9d148e5f5ee
Parents: 48bda91
Author: Mingliang Liu <liuml07@apache.org>
Authored: Mon Apr 3 16:56:22 2017 -0700
Committer: Mingliang Liu <liuml07@apache.org>
Committed: Tue Apr 4 19:49:34 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/s3a/Listing.java  | 129 ++++++++++++++++++-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  18 ++-
 .../apache/hadoop/fs/s3a/s3guard/S3Guard.java   |   4 +
 .../fs/s3a/ITestS3GuardListConsistency.java     |  69 +++++++++-
 4 files changed, 211 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bc8767/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 135428a..c9366af 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -27,13 +27,18 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
 import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.NoSuchElementException;
+import java.util.Set;
 
 import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX;
 import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus;
@@ -48,12 +53,35 @@ public class Listing {
 
   private final S3AFileSystem owner;
   private static final Logger LOG = S3AFileSystem.LOG;
+  private static final FileStatus[] EMPTY_FILE_STATUS_ARRAY = new FileStatus[0];
 
   public Listing(S3AFileSystem owner) {
     this.owner = owner;
   }
 
   /**
+   * Create a FileStatus iterator against a provided list of file status, with
+   * a given status filter.
+   *
+   * @param fileStatuses the provided list of file status. NO remote calls.
+   * @param filter file status filter
+   * @return the file status iterator
+   */
+  ProvidedLocatedFileStatusIterator createProvidedLocatedFileStatusIterator(
+      FileStatus[] fileStatuses, ProvidedFileStatusFilter filter) {
+    return new ProvidedLocatedFileStatusIterator(fileStatuses, filter);
+  }
+
+  FileStatusListingIterator createFileStatusListingIterator(
+      Path listPath,
+      ListObjectsRequest request,
+      PathFilter filter,
+      Listing.FileStatusAcceptor acceptor) throws IOException {
+    return createFileStatusListingIterator(listPath, request, filter, acceptor,
+        EMPTY_FILE_STATUS_ARRAY);
+  }
+
+  /**
    * Create a FileStatus iterator against a path, with a given
    * list object request.
    * @param listPath path of the listing
@@ -61,6 +89,8 @@ public class Listing {
    * @param filter the filter on which paths to accept
    * @param acceptor the class/predicate to decide which entries to accept
    * in the listing based on the full file status.
+   * @param providedStatus the provided list of file status, which may contain
+   *                       items that are not listed from source.
    * @return the iterator
    * @throws IOException IO Problems
    */
@@ -68,11 +98,13 @@ public class Listing {
       Path listPath,
       ListObjectsRequest request,
       PathFilter filter,
-      Listing.FileStatusAcceptor acceptor) throws IOException {
+      Listing.FileStatusAcceptor acceptor,
+      FileStatus[] providedStatus) throws IOException {
     return new FileStatusListingIterator(
         new ObjectListingIterator(listPath, request),
         filter,
-        acceptor);
+        acceptor,
+        providedStatus);
   }
 
   /**
@@ -169,6 +201,51 @@ public class Listing {
   }
 
   /**
+   * Filter out a FileStatus object, unlike {@link PathFilter} against a path.
+   */
+  interface ProvidedFileStatusFilter {
+    boolean accept(FileStatus status);
+  }
+
+  /**
+   * This wraps up a provided non-null list of file status as a remote iterator.
+   *
+   * It firstly filters the provided list and later {@link #next} call will get
+   * from the filtered list. This suffers from scalability issues if the
+   * provided list is too large.
+   *
+   * There is no remote data to fetch.
+   */
+  class ProvidedLocatedFileStatusIterator
+      implements RemoteIterator<LocatedFileStatus> {
+    private final ArrayList<FileStatus> filteredStatusList;
+    private int index = 0;
+
+    ProvidedLocatedFileStatusIterator(FileStatus[] fileStatuses,
+        ProvidedFileStatusFilter filter) {
+      Preconditions.checkArgument(fileStatuses != null, "Null status list!");
+
+      filteredStatusList = new ArrayList<>(fileStatuses.length);
+      for (FileStatus status : fileStatuses) {
+        if (filter.accept(status)) {
+          filteredStatusList.add(status);
+        }
+      }
+      filteredStatusList.trimToSize();
+    }
+
+    @Override
+    public boolean hasNext() throws IOException {
+      return index < filteredStatusList.size();
+    }
+
+    @Override
+    public LocatedFileStatus next() throws IOException {
+      return owner.toLocatedFileStatus(filteredStatusList.get(index++));
+    }
+  }
+
+  /**
    * Wraps up object listing into a remote iterator which will ask for more
    * listing data if needed.
    *
@@ -208,20 +285,32 @@ public class Listing {
     /** Iterator over the current set of results. */
     private ListIterator<FileStatus> statusBatchIterator;
 
+    private final Set<FileStatus> providedStatus;
+    private Iterator<FileStatus> providedStatusIterator;
+
     /**
      * Create an iterator over file status entries.
      * @param source the listing iterator from a listObjects call.
      * @param filter the filter on which paths to accept
      * @param acceptor the class/predicate to decide which entries to accept
      * in the listing based on the full file status.
+     * @param providedStatus the provided list of file status, which may contain
+     *                       items that are not listed from source.
      * @throws IOException IO Problems
      */
     FileStatusListingIterator(ObjectListingIterator source,
         PathFilter filter,
-        FileStatusAcceptor acceptor) throws IOException {
+        FileStatusAcceptor acceptor,
+        FileStatus[] providedStatus) throws IOException {
       this.source = source;
       this.filter = filter;
       this.acceptor = acceptor;
+      this.providedStatus = new HashSet<>(providedStatus.length);
+      for (FileStatus status : providedStatus) {
+        if (filter.accept(status.getPath())) {
+          this.providedStatus.add(status);
+        }
+      }
       // build the first set of results. This will not trigger any
       // remote IO, assuming the source iterator is in its initial
       // iteration
@@ -233,20 +322,46 @@ public class Listing {
      * If there is data in the local filtered list, return true.
      * Else: request more data util that condition is met, or there
      * is no more remote listing data.
+     * Lastly, return true if the provided file status has left items.
      * @return true if a call to {@link #next()} will succeed.
      * @throws IOException
      */
     @Override
     public boolean hasNext() throws IOException {
-      return statusBatchIterator.hasNext() || requestNextBatch();
+      return sourceHasNext() || providedStatusIterator.hasNext();
+    }
+
+    private boolean sourceHasNext() throws IOException {
+      if (statusBatchIterator.hasNext() || requestNextBatch()) {
+        return true;
+      } else {
+        // turn to file status that are only in provided list
+        if (providedStatusIterator == null) {
+          LOG.debug("Start iterating the provided status.");
+          providedStatusIterator = providedStatus.iterator();
+        }
+        return false;
+      }
     }
 
     @Override
     public FileStatus next() throws IOException {
-      if (!hasNext()) {
-        throw new NoSuchElementException();
+      final FileStatus status;
+      if (sourceHasNext()) {
+        status = statusBatchIterator.next();
+        // We remove from provided list the file status listed by S3 so that
+        // this does not return duplicate items.
+        LOG.debug("Removing the status from provided file status {}", status);
+        providedStatus.remove(status);
+      } else {
+        if (providedStatusIterator.hasNext()) {
+          status = providedStatusIterator.next();
+          LOG.debug("Returning provided file status {}", status);
+        } else {
+          throw new NoSuchElementException();
+        }
       }
-      return statusBatchIterator.next();
+      return status;
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bc8767/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 553e442..129d4ae 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -2486,12 +2486,28 @@ public class S3AFileSystem extends FileSystem {
             filter.accept(path) ? toLocatedFileStatus(fileStatus) : null);
       } else {
         // directory: trigger a lookup
+        final DirListingMetadata dirMeta = metadataStore.listChildren(path);
+        if (allowAuthoritative
+            && dirMeta != null
+            && dirMeta.isAuthoritative()) {
+          return listing.createProvidedLocatedFileStatusIterator(
+              S3Guard.dirMetaToStatuses(dirMeta),
+              new Listing.ProvidedFileStatusFilter() {
+                @Override
+                public boolean accept(FileStatus status) {
+                  return filter.accept(status.getPath());
+                }
+              });
+        }
+
         String key = maybeAddTrailingSlash(pathToKey(path));
         return listing.createLocatedFileStatusIterator(
             listing.createFileStatusListingIterator(path,
                 createListObjectsRequest(key, "/"),
                 filter,
-                new Listing.AcceptAllButSelfAndS3nDirs(path)));
+                new Listing.AcceptAllButSelfAndS3nDirs(path),
+                S3Guard.dirMetaToStatuses(dirMeta)
+            ));
       }
     } catch (AmazonClientException e) {
       throw translateException("listLocatedStatus", path, e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bc8767/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
index 360b561..64afa93 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
@@ -134,6 +134,10 @@ public final class S3Guard {
   }
 
   public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta)  {
+    if (dirMeta == null) {
+      return new FileStatus[0];
+    }
+
     Collection<PathMetadata> listing = dirMeta.getListing();
     FileStatus[] statuses = new FileStatus[listing.size()];
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bc8767/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index f59b80d..0e62bdf 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -21,7 +21,9 @@ package org.apache.hadoop.fs.s3a;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata;
@@ -48,7 +50,7 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
   }
 
   @Test
-  public void testConsistentList() throws Exception {
+  public void testConsistentListStatus() throws Exception {
 
     S3AFileSystem fs = getFileSystem();
 
@@ -80,6 +82,71 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     assertTrue(list.contains(inconsistentPath));
   }
 
+  /**
+   * Similar to {@link #testConsistentListStatus()}, this tests that the FS
+   * listLocatedStatus() call will return consistent list.
+   */
+  @Test
+  public void testConsistentListLocatedStatus() throws Exception {
+    final S3AFileSystem fs = getFileSystem();
+    // This test will fail if NullMetadataStore (the default) is configured:
+    // skip it.
+    Assume.assumeTrue(fs.hasMetadataStore());
+    fs.mkdirs(path("doTestConsistentListLocatedStatus"));
+
+    final int[] numOfPaths = {0, 1, 10};
+    for (int normalPathNum : numOfPaths) {
+      for (int delayedPathNum : numOfPaths) {
+        LOG.info("Testing with normalPathNum={}, delayedPathNum={}",
+            normalPathNum, delayedPathNum);
+        doTestConsistentListLocatedStatus(fs, normalPathNum, delayedPathNum);
+      }
+    }
+  }
+
+  /**
+   * Helper method to implement the tests of consistent listLocatedStatus().
+   * @param fs The S3 file system from contract
+   * @param normalPathNum number paths listed directly from S3 without delaying
+   * @param delayedPathNum number paths listed with delaying
+   * @throws Exception
+   */
+  private void doTestConsistentListLocatedStatus(S3AFileSystem fs,
+      int normalPathNum, int delayedPathNum) throws Exception {
+    final List<Path> testDirs = new ArrayList<>(normalPathNum + delayedPathNum);
+    int index = 0;
+    for (; index < normalPathNum; index++) {
+      testDirs.add(path("doTestConsistentListLocatedStatus/dir-" + index));
+    }
+    for (; index < normalPathNum + delayedPathNum; index++) {
+      // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
+      // in listObjects() results via InconsistentS3Client
+      testDirs.add(path("doTestConsistentListLocatedStatus/dir-" + index
+          + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING));
+    }
+
+    for (Path path : testDirs) {
+      // delete the old test path (if any) so that when we call mkdirs() later,
+      // the to delay directories will be tracked via putObject() request.
+      fs.delete(path, true);
+      assertTrue(fs.mkdirs(path));
+    }
+
+    // this should return the union data from S3 and MetadataStore
+    final RemoteIterator<LocatedFileStatus> statusIterator =
+        fs.listLocatedStatus(path("doTestConsistentListLocatedStatus/"));
+    List<Path> list = new ArrayList<>();
+    for (; statusIterator.hasNext();) {
+      list.add(statusIterator.next().getPath());
+    }
+
+    // This should fail without S3Guard, and succeed with it because part of the
+    // children under test path are delaying visibility
+    for (Path path : testDirs) {
+      assertTrue("listLocatedStatus should list " + path, list.contains(path));
+    }
+  }
+
   @Test
   public void testListStatusWriteBack() throws Exception {
     Assume.assumeTrue(getFileSystem().hasMetadataStore());


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message