Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A5711200D04 for ; Mon, 11 Sep 2017 08:48:42 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A41B31609C5; Mon, 11 Sep 2017 06:48:42 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 08A261609C7 for ; Mon, 11 Sep 2017 08:48:40 +0200 (CEST) Received: (qmail 69924 invoked by uid 500); 11 Sep 2017 06:48:29 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 68393 invoked by uid 99); 11 Sep 2017 06:48:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Sep 2017 06:48:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D18DCF574D; Mon, 11 Sep 2017 06:48:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: asuresh@apache.org To: common-commits@hadoop.apache.org Date: Mon, 11 Sep 2017 06:48:58 -0000 Message-Id: In-Reply-To: <6d8213f631914bd1bfcbfc1704486aec@git.apache.org> References: <6d8213f631914bd1bfcbfc1704486aec@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [33/50] [abbrv] hadoop git commit: HADOOP-13421. Switch to v2 of the S3 List Objects API in S3A. Contributed by Aaron Fabbri archived-at: Mon, 11 Sep 2017 06:48:42 -0000 HADOOP-13421. Switch to v2 of the S3 List Objects API in S3A. Contributed by Aaron Fabbri Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5bbca804 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5bbca804 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5bbca804 Branch: refs/heads/YARN-5972 Commit: 5bbca80428ffbe776650652de86a3bba885edb31 Parents: ab8368d Author: Steve Loughran Authored: Fri Sep 8 12:07:02 2017 +0100 Committer: Steve Loughran Committed: Fri Sep 8 12:07:02 2017 +0100 ---------------------------------------------------------------------- .../src/main/resources/core-default.xml | 9 ++ .../org/apache/hadoop/fs/s3a/Constants.java | 9 ++ .../fs/s3a/InconsistentAmazonS3Client.java | 143 ++++++++++++++++--- .../java/org/apache/hadoop/fs/s3a/Listing.java | 22 +-- .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 91 ++++++++---- .../org/apache/hadoop/fs/s3a/S3ListRequest.java | 69 +++++++++ .../org/apache/hadoop/fs/s3a/S3ListResult.java | 97 +++++++++++++ .../src/site/markdown/tools/hadoop-aws/index.md | 9 ++ .../ITestS3AContractGetFileStatusV1List.java | 59 ++++++++ .../fs/s3a/ITestS3GuardListConsistency.java | 22 +-- .../hadoop/fs/s3a/TestS3AGetFileStatus.java | 41 +++--- 11 files changed, 492 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 9e2c553..23739b0 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1428,6 +1428,15 @@ The implementation class of the S3A AbstractFileSystem. + + fs.s3a.list.version + 2 + + Select which version of the S3 SDK's List Objects API to use. Currently + support 2 (default) and 1 (older API). + + + fs.wasb.impl http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java index 1a464d0..4e2af3a 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java @@ -451,4 +451,13 @@ public final class Constants { public static final String FAIL_INJECT_INCONSISTENCY_PROBABILITY = "fs.s3a.failinject.inconsistency.probability"; + /** + * S3 API level parameters. + */ + @InterfaceStability.Unstable + public static final String LIST_VERSION = "fs.s3a.list.version"; + + @InterfaceStability.Unstable + public static final int DEFAULT_LIST_VERSION = 2; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java index 5e9cb3f..6476f5d 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java @@ -28,6 +28,8 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest; import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.DeleteObjectsResult; import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.PutObjectRequest; import com.amazonaws.services.s3.model.PutObjectResult; @@ -109,8 +111,10 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { } } - /** Map of key to delay -> time it was deleted + object summary (object - * summary is null for prefixes. */ + /** + * Map of key to delay -> time it was deleted + object summary (object summary + * is null for prefixes. + */ private Map delayedDeletes = new HashMap<>(); /** Map of key to delay -> time it was created. */ @@ -196,17 +200,29 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { return super.putObject(putObjectRequest); } - /* We should only need to override this version of listObjects() */ + /* We should only need to override these versions of listObjects() */ @Override public ObjectListing listObjects(ListObjectsRequest listObjectsRequest) throws AmazonClientException, AmazonServiceException { LOG.debug("prefix {}", listObjectsRequest.getPrefix()); ObjectListing listing = super.listObjects(listObjectsRequest); - listing = filterListObjects(listObjectsRequest, listing); + listing = filterListObjects(listing); listing = restoreListObjects(listObjectsRequest, listing); return listing; } + /* We should only need to override these versions of listObjects() */ + @Override + public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request) + throws AmazonClientException, AmazonServiceException { + LOG.debug("prefix {}", request.getPrefix()); + ListObjectsV2Result listing = super.listObjectsV2(request); + listing = filterListObjectsV2(listing); + listing = restoreListObjectsV2(request, listing); + return listing; + } + + private void addSummaryIfNotPresent(List list, S3ObjectSummary item) { // Behavior of S3ObjectSummary @@ -282,21 +298,58 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { // recursive list has no delimiter, returns everything that matches a // prefix. boolean recursiveObjectList = !("/".equals(request.getDelimiter())); + String prefix = request.getPrefix(); + + restoreDeleted(outputList, outputPrefixes, recursiveObjectList, prefix); + return new CustomObjectListing(rawListing, outputList, outputPrefixes); + } + + /** + * V2 list API variant of + * {@link #restoreListObjects(ListObjectsRequest, ObjectListing)}. + * @param request original v2 list request + * @param result raw s3 result + */ + private ListObjectsV2Result restoreListObjectsV2(ListObjectsV2Request request, + ListObjectsV2Result result) { + List outputList = result.getObjectSummaries(); + List outputPrefixes = result.getCommonPrefixes(); + // recursive list has no delimiter, returns everything that matches a + // prefix. + boolean recursiveObjectList = !("/".equals(request.getDelimiter())); + String prefix = request.getPrefix(); + + restoreDeleted(outputList, outputPrefixes, recursiveObjectList, prefix); + return new CustomListObjectsV2Result(result, outputList, outputPrefixes); + } + + + /** + * Main logic for + * {@link #restoreListObjects(ListObjectsRequest, ObjectListing)} and + * the v2 variant above. + * @param summaries object summary list to modify. + * @param prefixes prefix list to modify + * @param recursive true if recursive list request + * @param prefix prefix for original list request + */ + private void restoreDeleted(List summaries, + List prefixes, boolean recursive, String prefix) { // Go through all deleted keys for (String key : new HashSet<>(delayedDeletes.keySet())) { Delete delete = delayedDeletes.get(key); if (isKeyDelayed(delete.time(), key)) { - if (isDescendant(request.getPrefix(), key, recursiveObjectList)) { + if (isDescendant(prefix, key, recursive)) { if (delete.summary() != null) { - addSummaryIfNotPresent(outputList, delete.summary()); + addSummaryIfNotPresent(summaries, delete.summary()); } } // Non-recursive list has delimiter: will return rolled-up prefixes for // all keys that are not direct children - if (!recursiveObjectList) { - if (isDescendant(request.getPrefix(), key, true)) { - addPrefixIfNotPresent(outputPrefixes, request.getPrefix(), key); + if (!recursive) { + if (isDescendant(prefix, key, true)) { + addPrefixIfNotPresent(prefixes, prefix, key); } } } else { @@ -304,31 +357,52 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { delayedDeletes.remove(key); } } + } + + private ObjectListing filterListObjects(ObjectListing rawListing) { + + // Filter object listing + List outputList = filterSummaries( + rawListing.getObjectSummaries()); + + // Filter prefixes (directories) + List outputPrefixes = filterPrefixes( + rawListing.getCommonPrefixes()); return new CustomObjectListing(rawListing, outputList, outputPrefixes); } - private ObjectListing filterListObjects(ListObjectsRequest request, - ObjectListing rawListing) { - + private ListObjectsV2Result filterListObjectsV2(ListObjectsV2Result raw) { // Filter object listing + List outputList = filterSummaries( + raw.getObjectSummaries()); + + // Filter prefixes (directories) + List outputPrefixes = filterPrefixes(raw.getCommonPrefixes()); + + return new CustomListObjectsV2Result(raw, outputList, outputPrefixes); + } + + private List filterSummaries( + List summaries) { List outputList = new ArrayList<>(); - for (S3ObjectSummary s : rawListing.getObjectSummaries()) { + for (S3ObjectSummary s : summaries) { String key = s.getKey(); if (!isKeyDelayed(delayedPutKeys.get(key), key)) { outputList.add(s); } } + return outputList; + } - // Filter prefixes (directories) + private List filterPrefixes(List prefixes) { List outputPrefixes = new ArrayList<>(); - for (String key : rawListing.getCommonPrefixes()) { + for (String key : prefixes) { if (!isKeyDelayed(delayedPutKeys.get(key), key)) { outputPrefixes.add(key); } } - - return new CustomObjectListing(rawListing, outputList, outputPrefixes); + return outputPrefixes; } private boolean isKeyDelayed(Long enqueueTime, String key) { @@ -342,7 +416,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { delayedDeletes.remove(key); LOG.debug("no longer delaying {}", key); return false; - } else { + } else { LOG.info("delaying {}", key); return true; } @@ -431,4 +505,37 @@ public class InconsistentAmazonS3Client extends AmazonS3Client { return customPrefixes; } } + + private static class CustomListObjectsV2Result extends ListObjectsV2Result { + + private final List customListing; + private final List customPrefixes; + + CustomListObjectsV2Result(ListObjectsV2Result raw, + List customListing, List customPrefixes) { + super(); + this.customListing = customListing; + this.customPrefixes = customPrefixes; + + this.setBucketName(raw.getBucketName()); + this.setCommonPrefixes(raw.getCommonPrefixes()); + this.setDelimiter(raw.getDelimiter()); + this.setEncodingType(raw.getEncodingType()); + this.setStartAfter(raw.getStartAfter()); + this.setMaxKeys(raw.getMaxKeys()); + this.setContinuationToken(raw.getContinuationToken()); + this.setPrefix(raw.getPrefix()); + this.setTruncated(raw.isTruncated()); + } + + @Override + public List getObjectSummaries() { + return customListing; + } + + @Override + public List getCommonPrefixes() { + return customPrefixes; + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java index 8efa218..d9f059b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java @@ -19,8 +19,6 @@ package org.apache.hadoop.fs.s3a; import com.amazonaws.AmazonClientException; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.FileStatus; @@ -90,7 +88,7 @@ public class Listing { */ FileStatusListingIterator createFileStatusListingIterator( Path listPath, - ListObjectsRequest request, + S3ListRequest request, PathFilter filter, Listing.FileStatusAcceptor acceptor) throws IOException { return createFileStatusListingIterator(listPath, request, filter, acceptor, @@ -112,7 +110,7 @@ public class Listing { */ FileStatusListingIterator createFileStatusListingIterator( Path listPath, - ListObjectsRequest request, + S3ListRequest request, PathFilter filter, Listing.FileStatusAcceptor acceptor, RemoteIterator providedStatus) throws IOException { @@ -432,7 +430,7 @@ public class Listing { * @param objects the next object listing * @return true if this added any entries after filtering */ - private boolean buildNextStatusBatch(ObjectListing objects) { + private boolean buildNextStatusBatch(S3ListResult objects) { // counters for debug logs int added = 0, ignored = 0; // list to fill in with results. Initial size will be list maximum. @@ -512,13 +510,16 @@ public class Listing { * * Thread safety: none. */ - class ObjectListingIterator implements RemoteIterator { + class ObjectListingIterator implements RemoteIterator { /** The path listed. */ private final Path listPath; /** The most recent listing results. */ - private ObjectListing objects; + private S3ListResult objects; + + /** The most recent listing request. */ + private S3ListRequest request; /** Indicator that this is the first listing. */ private boolean firstListing = true; @@ -542,10 +543,11 @@ public class Listing { * */ ObjectListingIterator( Path listPath, - ListObjectsRequest request) { + S3ListRequest request) { this.listPath = listPath; this.maxKeys = owner.getMaxKeys(); this.objects = owner.listObjects(request); + this.request = request; } /** @@ -569,7 +571,7 @@ public class Listing { * @throws NoSuchElementException if there is no more data to list. */ @Override - public ObjectListing next() throws IOException { + public S3ListResult next() throws IOException { if (firstListing) { // on the first listing, don't request more data. // Instead just clear the firstListing flag so that it future calls @@ -585,7 +587,7 @@ public class Listing { // need to request a new set of objects. LOG.debug("[{}], Requesting next {} objects under {}", listingCount, maxKeys, listPath); - objects = owner.continueListObjects(objects); + objects = owner.continueListObjects(request, objects); listingCount++; LOG.debug("New listing status: {}", this); } catch (AmazonClientException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index c22383a..e76ef0b 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -53,8 +53,8 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; import com.amazonaws.services.s3.model.MultiObjectDeleteException; -import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.PutObjectRequest; @@ -167,6 +167,7 @@ public class S3AFileSystem extends FileSystem { private String blockOutputBuffer; private S3ADataBlocks.BlockFactory blockFactory; private int blockOutputActiveBlocks; + private boolean useListV1; /** Add any deprecated keys. */ @SuppressWarnings("deprecation") @@ -261,6 +262,13 @@ public class S3AFileSystem extends FileSystem { BlockingThreadPoolExecutorService.newDaemonThreadFactory( "s3a-transfer-unbounded")); + int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION); + if (listVersion < 1 || listVersion > 2) { + LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " + + "version 2", listVersion); + } + useListV1 = (listVersion == 1); + initTransferManager(); initCannedAcls(conf); @@ -1056,21 +1064,37 @@ public class S3AFileSystem extends FileSystem { * @param request request to initiate * @return the results */ - protected ObjectListing listObjects(ListObjectsRequest request) { + protected S3ListResult listObjects(S3ListRequest request) { incrementStatistic(OBJECT_LIST_REQUESTS); incrementReadOperations(); - return s3.listObjects(request); + if (useListV1) { + Preconditions.checkArgument(request.isV1()); + return S3ListResult.v1(s3.listObjects(request.getV1())); + } else { + Preconditions.checkArgument(!request.isV1()); + return S3ListResult.v2(s3.listObjectsV2(request.getV2())); + } } /** * List the next set of objects. - * @param objects paged result + * @param request last list objects request to continue + * @param prevResult last paged result to continue from * @return the next result object */ - protected ObjectListing continueListObjects(ObjectListing objects) { + protected S3ListResult continueListObjects(S3ListRequest request, + S3ListResult prevResult) { incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS); incrementReadOperations(); - return s3.listNextBatchOfObjects(objects); + if (useListV1) { + Preconditions.checkArgument(request.isV1()); + return S3ListResult.v1(s3.listNextBatchOfObjects(prevResult.getV1())); + } else { + Preconditions.checkArgument(!request.isV1()); + request.getV2().setContinuationToken(prevResult.getV2() + .getNextContinuationToken()); + return S3ListResult.v2(s3.listObjectsV2(request.getV2())); + } } /** @@ -1464,9 +1488,9 @@ public class S3AFileSystem extends FileSystem { } else { LOG.debug("Getting objects for directory prefix {} to delete", key); - ListObjectsRequest request = createListObjectsRequest(key, null); + S3ListRequest request = createListObjectsRequest(key, null); - ObjectListing objects = listObjects(request); + S3ListResult objects = listObjects(request); List keys = new ArrayList<>(objects.getObjectSummaries().size()); while (true) { @@ -1481,7 +1505,7 @@ public class S3AFileSystem extends FileSystem { } if (objects.isTruncated()) { - objects = continueListObjects(objects); + objects = continueListObjects(request, objects); } else { if (!keys.isEmpty()) { // TODO: HADOOP-13761 S3Guard: retries @@ -1589,7 +1613,7 @@ public class S3AFileSystem extends FileSystem { return S3Guard.dirMetaToStatuses(dirMeta); } - ListObjectsRequest request = createListObjectsRequest(key, "/"); + S3ListRequest request = createListObjectsRequest(key, "/"); LOG.debug("listStatus: doing listObjects for directory {}", key); Listing.FileStatusListingIterator files = @@ -1619,16 +1643,38 @@ public class S3AFileSystem extends FileSystem { * @return the request */ @VisibleForTesting - ListObjectsRequest createListObjectsRequest(String key, + S3ListRequest createListObjectsRequest(String key, String delimiter) { - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setMaxKeys(maxKeys); - request.setPrefix(key); - if (delimiter != null) { - request.setDelimiter(delimiter); + return createListObjectsRequest(key, delimiter, null); + } + + private S3ListRequest createListObjectsRequest(String key, + String delimiter, Integer overrideMaxKeys) { + if (!useListV1) { + ListObjectsV2Request request = + new ListObjectsV2Request().withBucketName(bucket) + .withMaxKeys(maxKeys) + .withPrefix(key); + if (delimiter != null) { + request.setDelimiter(delimiter); + } + if (overrideMaxKeys != null) { + request.setMaxKeys(overrideMaxKeys); + } + return S3ListRequest.v2(request); + } else { + ListObjectsRequest request = new ListObjectsRequest(); + request.setBucketName(bucket); + request.setMaxKeys(maxKeys); + request.setPrefix(key); + if (delimiter != null) { + request.setDelimiter(delimiter); + } + if (overrideMaxKeys != null) { + request.setMaxKeys(overrideMaxKeys); + } + return S3ListRequest.v1(request); } - return request; } /** @@ -1885,13 +1931,9 @@ public class S3AFileSystem extends FileSystem { try { key = maybeAddTrailingSlash(key); - ListObjectsRequest request = new ListObjectsRequest(); - request.setBucketName(bucket); - request.setPrefix(key); - request.setDelimiter("/"); - request.setMaxKeys(1); + S3ListRequest request = createListObjectsRequest(key, "/", 1); - ObjectListing objects = listObjects(request); + S3ListResult objects = listObjects(request); Collection prefixes = objects.getCommonPrefixes(); Collection summaries = objects.getObjectSummaries(); @@ -2441,6 +2483,7 @@ public class S3AFileSystem extends FileSystem { } sb.append(", metastore=").append(metadataStore); sb.append(", authoritative=").append(allowAuthoritative); + sb.append(", useListV1=").append(useListV1); sb.append(", boundedExecutor=").append(boundedThreadPool); sb.append(", unboundedExecutor=").append(unboundedThreadPool); sb.append(", statistics {") http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java new file mode 100644 index 0000000..6b3bd46 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; + +/** + * API version-independent container for S3 List requests. + */ +public class S3ListRequest { + private ListObjectsRequest v1Request; + private ListObjectsV2Request v2Request; + + protected S3ListRequest(ListObjectsRequest v1, ListObjectsV2Request v2) { + v1Request = v1; + v2Request = v2; + } + + /** + * Restricted constructors to ensure v1 or v2, not both. + * @param request v1 request + * @return new list request container + */ + public static S3ListRequest v1(ListObjectsRequest request) { + return new S3ListRequest(request, null); + } + + /** + * Restricted constructors to ensure v1 or v2, not both. + * @param request v2 request + * @return new list request container + */ + public static S3ListRequest v2(ListObjectsV2Request request) { + return new S3ListRequest(null, request); + } + + /** + * Is this a v1 API request or v2? + * @return true if v1, false if v2 + */ + public boolean isV1() { + return v1Request != null; + } + + public ListObjectsRequest getV1() { + return v1Request; + } + + public ListObjectsV2Request getV2() { + return v2Request; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java new file mode 100644 index 0000000..e8aff32 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import com.amazonaws.services.s3.model.ListObjectsV2Result; +import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.S3ObjectSummary; + +import java.util.List; + +/** + * API version-independent container for S3 List responses. + */ +public class S3ListResult { + private ObjectListing v1Result; + private ListObjectsV2Result v2Result; + + protected S3ListResult(ObjectListing v1, ListObjectsV2Result v2) { + v1Result = v1; + v2Result = v2; + } + + /** + * Restricted constructors to ensure v1 or v2, not both. + * @param result v1 result + * @return new list result container + */ + public static S3ListResult v1(ObjectListing result) { + return new S3ListResult(result, null); + } + + /** + * Restricted constructors to ensure v1 or v2, not both. + * @param result v2 result + * @return new list result container + */ + public static S3ListResult v2(ListObjectsV2Result result) { + return new S3ListResult(null, result); + } + + /** + * Is this a v1 API result or v2? + * @return true if v1, false if v2 + */ + public boolean isV1() { + return v1Result != null; + } + + public ObjectListing getV1() { + return v1Result; + } + + public ListObjectsV2Result getV2() { + return v2Result; + } + + public List getObjectSummaries() { + if (isV1()) { + return v1Result.getObjectSummaries(); + } else { + return v2Result.getObjectSummaries(); + } + } + + public boolean isTruncated() { + if (isV1()) { + return v1Result.isTruncated(); + } else { + return v2Result.isTruncated(); + } + } + + public List getCommonPrefixes() { + if (isV1()) { + return v1Result.getCommonPrefixes(); + } else { + return v2Result.getCommonPrefixes(); + } + + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md index b8d37c6..ffae1e9 100644 --- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md +++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md @@ -895,6 +895,15 @@ from placing its declaration on the command line. any call to setReadahead() is made to an open stream. + + fs.s3a.list.version + 2 + + Select which version of the S3 SDK's List Objects API to use. Currently + support 2 (default) and 1 (older API). + + + ### Configuring different S3 buckets Different S3 buckets can be accessed with different S3A client configurations. http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java new file mode 100644 index 0000000..5275336 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest; +import org.apache.hadoop.fs.contract.AbstractFSContract; +import org.apache.hadoop.fs.contract.s3a.S3AContract; + +import static org.apache.hadoop.fs.s3a.Constants.LIST_VERSION; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching; +import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard; + +/** + * S3A contract tests for getFileStatus, using the v1 List Objects API. + */ +public class ITestS3AContractGetFileStatusV1List + extends AbstractContractGetFileStatusTest { + + + @Override + protected AbstractFSContract createContract(Configuration conf) { + return new S3AContract(conf); + } + + @Override + public void teardown() throws Exception { + getLog().info("FS details {}", getFileSystem()); + super.teardown(); + } + + @Override + protected Configuration createConfiguration() { + Configuration conf = super.createConfiguration(); + disableFilesystemCaching(conf); + conf.setInt(Constants.MAX_PAGING_KEYS, 2); + maybeEnableS3Guard(conf); + + // Use v1 List Objects API + conf.setInt(LIST_VERSION, 1); + return conf; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java index 6cff533..da7699e 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java @@ -18,7 +18,7 @@ package org.apache.hadoop.fs.s3a; -import com.amazonaws.services.s3.model.ObjectListing; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.AmazonS3; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -488,6 +488,10 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase { @Test public void testInconsistentS3ClientDeletes() throws Throwable { + // Test only implemented for v2 S3 list API + Assume.assumeTrue(getConfiguration() + .getInt(LIST_VERSION, DEFAULT_LIST_VERSION) == 2); + S3AFileSystem fs = getFileSystem(); Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING); for (int i = 0; i < 3; i++) { @@ -502,17 +506,17 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase { AmazonS3 client = fs.getAmazonS3Client(); String key = fs.pathToKey(root) + "/"; - ObjectListing preDeleteDelimited = client.listObjects( - fs.createListObjectsRequest(key, "/")); - ObjectListing preDeleteUndelimited = client.listObjects( - fs.createListObjectsRequest(key, null)); + ListObjectsV2Result preDeleteDelimited = client.listObjectsV2( + fs.createListObjectsRequest(key, "/").getV2()); + ListObjectsV2Result preDeleteUndelimited = client.listObjectsV2( + fs.createListObjectsRequest(key, null).getV2()); fs.delete(root, true); - ObjectListing postDeleteDelimited = client.listObjects( - fs.createListObjectsRequest(key, "/")); - ObjectListing postDeleteUndelimited = client.listObjects( - fs.createListObjectsRequest(key, null)); + ListObjectsV2Result postDeleteDelimited = client.listObjectsV2( + fs.createListObjectsRequest(key, "/").getV2()); + ListObjectsV2Result postDeleteUndelimited = client.listObjectsV2( + fs.createListObjectsRequest(key, null).getV2()); assertEquals("InconsistentAmazonS3Client added back objects incorrectly " + "in a non-recursive listing", http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java index 58e4d30..586264d 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java @@ -25,9 +25,12 @@ import static org.mockito.Mockito.*; import java.io.FileNotFoundException; import java.util.Collections; import java.util.Date; +import java.util.List; import com.amazonaws.services.s3.model.GetObjectMetadataRequest; import com.amazonaws.services.s3.model.ListObjectsRequest; +import com.amazonaws.services.s3.model.ListObjectsV2Request; +import com.amazonaws.services.s3.model.ListObjectsV2Result; import com.amazonaws.services.s3.model.ObjectListing; import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.S3ObjectSummary; @@ -93,12 +96,7 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { when(s3.getObjectMetadata(argThat( correctGetMetadataRequest(BUCKET, key + "/")) )).thenThrow(NOT_FOUND); - ObjectListing objects = mock(ObjectListing.class); - when(objects.getCommonPrefixes()).thenReturn( - Collections.singletonList("dir/")); - when(objects.getObjectSummaries()).thenReturn( - Collections.emptyList()); - when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects); + setupListMocks(Collections.singletonList("dir/"), Collections.emptyList()); FileStatus stat = fs.getFileStatus(path); assertNotNull(stat); assertEquals(fs.makeQualified(path), stat.getPath()); @@ -118,12 +116,7 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { when(s3.getObjectMetadata(argThat( correctGetMetadataRequest(BUCKET, key + "/") ))).thenThrow(NOT_FOUND); - ObjectListing objects = mock(ObjectListing.class); - when(objects.getCommonPrefixes()).thenReturn( - Collections.emptyList()); - when(objects.getObjectSummaries()).thenReturn( - Collections.emptyList()); - when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects); + setupListMocks(Collections.emptyList(), Collections.emptyList()); FileStatus stat = fs.getFileStatus(path); assertNotNull(stat); assertEquals(fs.makeQualified(path), stat.getPath()); @@ -140,16 +133,28 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest { when(s3.getObjectMetadata(argThat( correctGetMetadataRequest(BUCKET, key + "/") ))).thenThrow(NOT_FOUND); - ObjectListing objects = mock(ObjectListing.class); - when(objects.getCommonPrefixes()).thenReturn( - Collections.emptyList()); - when(objects.getObjectSummaries()).thenReturn( - Collections.emptyList()); - when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects); + setupListMocks(Collections.emptyList(), Collections.emptyList()); exception.expect(FileNotFoundException.class); fs.getFileStatus(path); } + private void setupListMocks(List prefixes, + List summaries) { + + // V1 list API mock + ObjectListing objects = mock(ObjectListing.class); + when(objects.getCommonPrefixes()).thenReturn(prefixes); + when(objects.getObjectSummaries()).thenReturn(summaries); + when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects); + + // V2 list API mock + ListObjectsV2Result v2Result = mock(ListObjectsV2Result.class); + when(v2Result.getCommonPrefixes()).thenReturn(prefixes); + when(v2Result.getObjectSummaries()).thenReturn(summaries); + when(s3.listObjectsV2(any(ListObjectsV2Request.class))) + .thenReturn(v2Result); + } + private Matcher correctGetMetadataRequest( String bucket, String key) { return new BaseMatcher() { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org