Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5AF2E200C5B for ; Wed, 5 Apr 2017 04:49:55 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 597FB160BA1; Wed, 5 Apr 2017 02:49:55 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 53247160B90 for ; Wed, 5 Apr 2017 04:49:54 +0200 (CEST) Received: (qmail 15251 invoked by uid 500); 5 Apr 2017 02:49:53 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 15240 invoked by uid 99); 5 Apr 2017 02:49:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Apr 2017 02:49:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C12EEDFCB3; Wed, 5 Apr 2017 02:49:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: liuml07@apache.org To: common-commits@hadoop.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HADOOP-13926. S3Guard: S3AFileSystem::listLocatedStatus() to employ MetadataStore. Contributed by Mingliang Liu, Rajesh Balamohan and Aaron Fabbri Date: Wed, 5 Apr 2017 02:49:52 +0000 (UTC) archived-at: Wed, 05 Apr 2017 02:49:55 -0000 Repository: hadoop Updated Branches: refs/heads/HADOOP-13345 48bda91e4 -> 72bc8767e HADOOP-13926. S3Guard: S3AFileSystem::listLocatedStatus() to employ MetadataStore. Contributed by Mingliang Liu, Rajesh Balamohan and Aaron Fabbri Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/72bc8767 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/72bc8767 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/72bc8767 Branch: refs/heads/HADOOP-13345 Commit: 72bc8767e289362bf2283731667eb9d148e5f5ee Parents: 48bda91 Author: Mingliang Liu Authored: Mon Apr 3 16:56:22 2017 -0700 Committer: Mingliang Liu Committed: Tue Apr 4 19:49:34 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/hadoop/fs/s3a/Listing.java | 129 ++++++++++++++++++- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 18 ++- .../apache/hadoop/fs/s3a/s3guard/S3Guard.java | 4 + .../fs/s3a/ITestS3GuardListConsistency.java | 69 +++++++++- 4 files changed, 211 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bc8767/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 135428a..c9366af 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -27,13 +27,18 @@ import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RemoteIterator; + +import com.google.common.base.Preconditions; import org.slf4j.Logger; import java.io.IOException; import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.ListIterator; import java.util.NoSuchElementException; +import java.util.Set; import static org.apache.hadoop.fs.s3a.Constants.S3N_FOLDER_SUFFIX; import static org.apache.hadoop.fs.s3a.S3AUtils.createFileStatus; @@ -48,12 +53,35 @@ public class Listing { private final S3AFileSystem owner; private static final Logger LOG = S3AFileSystem.LOG; + private static final FileStatus[] EMPTY_FILE_STATUS_ARRAY = new FileStatus[0]; public Listing(S3AFileSystem owner) { this.owner = owner; } /** + * Create a FileStatus iterator against a provided list of file status, with + * a given status filter. + * + * @param fileStatuses the provided list of file status. NO remote calls. + * @param filter file status filter + * @return the file status iterator + */ + ProvidedLocatedFileStatusIterator createProvidedLocatedFileStatusIterator( + FileStatus[] fileStatuses, ProvidedFileStatusFilter filter) { + return new ProvidedLocatedFileStatusIterator(fileStatuses, filter); + } + + FileStatusListingIterator createFileStatusListingIterator( + Path listPath, + ListObjectsRequest request, + PathFilter filter, + Listing.FileStatusAcceptor acceptor) throws IOException { + return createFileStatusListingIterator(listPath, request, filter, acceptor, + EMPTY_FILE_STATUS_ARRAY); + } + + /** * Create a FileStatus iterator against a path, with a given * list object request. * @param listPath path of the listing @@ -61,6 +89,8 @@ public class Listing { * @param filter the filter on which paths to accept * @param acceptor the class/predicate to decide which entries to accept * in the listing based on the full file status. + * @param providedStatus the provided list of file status, which may contain + * items that are not listed from source. * @return the iterator * @throws IOException IO Problems */ @@ -68,11 +98,13 @@ public class Listing { Path listPath, ListObjectsRequest request, PathFilter filter, - Listing.FileStatusAcceptor acceptor) throws IOException { + Listing.FileStatusAcceptor acceptor, + FileStatus[] providedStatus) throws IOException { return new FileStatusListingIterator( new ObjectListingIterator(listPath, request), filter, - acceptor); + acceptor, + providedStatus); } /** @@ -169,6 +201,51 @@ public class Listing { } /** + * Filter out a FileStatus object, unlike {@link PathFilter} against a path. + */ + interface ProvidedFileStatusFilter { + boolean accept(FileStatus status); + } + + /** + * This wraps up a provided non-null list of file status as a remote iterator. + * + * It firstly filters the provided list and later {@link #next} call will get + * from the filtered list. This suffers from scalability issues if the + * provided list is too large. + * + * There is no remote data to fetch. + */ + class ProvidedLocatedFileStatusIterator + implements RemoteIterator { + private final ArrayList filteredStatusList; + private int index = 0; + + ProvidedLocatedFileStatusIterator(FileStatus[] fileStatuses, + ProvidedFileStatusFilter filter) { + Preconditions.checkArgument(fileStatuses != null, "Null status list!"); + + filteredStatusList = new ArrayList<>(fileStatuses.length); + for (FileStatus status : fileStatuses) { + if (filter.accept(status)) { + filteredStatusList.add(status); + } + } + filteredStatusList.trimToSize(); + } + + @Override + public boolean hasNext() throws IOException { + return index < filteredStatusList.size(); + } + + @Override + public LocatedFileStatus next() throws IOException { + return owner.toLocatedFileStatus(filteredStatusList.get(index++)); + } + } + + /** * Wraps up object listing into a remote iterator which will ask for more * listing data if needed. * @@ -208,20 +285,32 @@ public class Listing { /** Iterator over the current set of results. */ private ListIterator statusBatchIterator; + private final Set providedStatus; + private Iterator providedStatusIterator; + /** * Create an iterator over file status entries. * @param source the listing iterator from a listObjects call. * @param filter the filter on which paths to accept * @param acceptor the class/predicate to decide which entries to accept * in the listing based on the full file status. + * @param providedStatus the provided list of file status, which may contain + * items that are not listed from source. * @throws IOException IO Problems */ FileStatusListingIterator(ObjectListingIterator source, PathFilter filter, - FileStatusAcceptor acceptor) throws IOException { + FileStatusAcceptor acceptor, + FileStatus[] providedStatus) throws IOException { this.source = source; this.filter = filter; this.acceptor = acceptor; + this.providedStatus = new HashSet<>(providedStatus.length); + for (FileStatus status : providedStatus) { + if (filter.accept(status.getPath())) { + this.providedStatus.add(status); + } + } // build the first set of results. This will not trigger any // remote IO, assuming the source iterator is in its initial // iteration @@ -233,20 +322,46 @@ public class Listing { * If there is data in the local filtered list, return true. * Else: request more data util that condition is met, or there * is no more remote listing data. + * Lastly, return true if the provided file status has left items. * @return true if a call to {@link #next()} will succeed. * @throws IOException */ @Override public boolean hasNext() throws IOException { - return statusBatchIterator.hasNext() || requestNextBatch(); + return sourceHasNext() || providedStatusIterator.hasNext(); + } + + private boolean sourceHasNext() throws IOException { + if (statusBatchIterator.hasNext() || requestNextBatch()) { + return true; + } else { + // turn to file status that are only in provided list + if (providedStatusIterator == null) { + LOG.debug("Start iterating the provided status."); + providedStatusIterator = providedStatus.iterator(); + } + return false; + } } @Override public FileStatus next() throws IOException { - if (!hasNext()) { - throw new NoSuchElementException(); + final FileStatus status; + if (sourceHasNext()) { + status = statusBatchIterator.next(); + // We remove from provided list the file status listed by S3 so that + // this does not return duplicate items. + LOG.debug("Removing the status from provided file status {}", status); + providedStatus.remove(status); + } else { + if (providedStatusIterator.hasNext()) { + status = providedStatusIterator.next(); + LOG.debug("Returning provided file status {}", status); + } else { + throw new NoSuchElementException(); + } } - return statusBatchIterator.next(); + return status; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bc8767/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 553e442..129d4ae 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -2486,12 +2486,28 @@ public class S3AFileSystem extends FileSystem { filter.accept(path) ? toLocatedFileStatus(fileStatus) : null); } else { // directory: trigger a lookup + final DirListingMetadata dirMeta = metadataStore.listChildren(path); + if (allowAuthoritative + && dirMeta != null + && dirMeta.isAuthoritative()) { + return listing.createProvidedLocatedFileStatusIterator( + S3Guard.dirMetaToStatuses(dirMeta), + new Listing.ProvidedFileStatusFilter() { + @Override + public boolean accept(FileStatus status) { + return filter.accept(status.getPath()); + } + }); + } + String key = maybeAddTrailingSlash(pathToKey(path)); return listing.createLocatedFileStatusIterator( listing.createFileStatusListingIterator(path, createListObjectsRequest(key, "/"), filter, - new Listing.AcceptAllButSelfAndS3nDirs(path))); + new Listing.AcceptAllButSelfAndS3nDirs(path), + S3Guard.dirMetaToStatuses(dirMeta) + )); } } catch (AmazonClientException e) { throw translateException("listLocatedStatus", path, e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bc8767/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java index 360b561..64afa93 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java @@ -134,6 +134,10 @@ public final class S3Guard { } public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) { + if (dirMeta == null) { + return new FileStatus[0]; + } + Collection listing = dirMeta.getListing(); FileStatus[] statuses = new FileStatus[listing.size()]; http://git-wip-us.apache.org/repos/asf/hadoop/blob/72bc8767/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java index f59b80d..0e62bdf 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java @@ -21,7 +21,9 @@ package org.apache.hadoop.fs.s3a; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.contract.AbstractFSContract; import org.apache.hadoop.fs.contract.s3a.S3AContract; import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; @@ -48,7 +50,7 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase { } @Test - public void testConsistentList() throws Exception { + public void testConsistentListStatus() throws Exception { S3AFileSystem fs = getFileSystem(); @@ -80,6 +82,71 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase { assertTrue(list.contains(inconsistentPath)); } + /** + * Similar to {@link #testConsistentListStatus()}, this tests that the FS + * listLocatedStatus() call will return consistent list. + */ + @Test + public void testConsistentListLocatedStatus() throws Exception { + final S3AFileSystem fs = getFileSystem(); + // This test will fail if NullMetadataStore (the default) is configured: + // skip it. + Assume.assumeTrue(fs.hasMetadataStore()); + fs.mkdirs(path("doTestConsistentListLocatedStatus")); + + final int[] numOfPaths = {0, 1, 10}; + for (int normalPathNum : numOfPaths) { + for (int delayedPathNum : numOfPaths) { + LOG.info("Testing with normalPathNum={}, delayedPathNum={}", + normalPathNum, delayedPathNum); + doTestConsistentListLocatedStatus(fs, normalPathNum, delayedPathNum); + } + } + } + + /** + * Helper method to implement the tests of consistent listLocatedStatus(). + * @param fs The S3 file system from contract + * @param normalPathNum number paths listed directly from S3 without delaying + * @param delayedPathNum number paths listed with delaying + * @throws Exception + */ + private void doTestConsistentListLocatedStatus(S3AFileSystem fs, + int normalPathNum, int delayedPathNum) throws Exception { + final List testDirs = new ArrayList<>(normalPathNum + delayedPathNum); + int index = 0; + for (; index < normalPathNum; index++) { + testDirs.add(path("doTestConsistentListLocatedStatus/dir-" + index)); + } + for (; index < normalPathNum + delayedPathNum; index++) { + // Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed + // in listObjects() results via InconsistentS3Client + testDirs.add(path("doTestConsistentListLocatedStatus/dir-" + index + + InconsistentAmazonS3Client.DELAY_KEY_SUBSTRING)); + } + + for (Path path : testDirs) { + // delete the old test path (if any) so that when we call mkdirs() later, + // the to delay directories will be tracked via putObject() request. + fs.delete(path, true); + assertTrue(fs.mkdirs(path)); + } + + // this should return the union data from S3 and MetadataStore + final RemoteIterator statusIterator = + fs.listLocatedStatus(path("doTestConsistentListLocatedStatus/")); + List list = new ArrayList<>(); + for (; statusIterator.hasNext();) { + list.add(statusIterator.next().getPath()); + } + + // This should fail without S3Guard, and succeed with it because part of the + // children under test path are delaying visibility + for (Path path : testDirs) { + assertTrue("listLocatedStatus should list " + path, list.contains(path)); + } + } + @Test public void testListStatusWriteBack() throws Exception { Assume.assumeTrue(getFileSystem().hasMetadataStore()); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org