hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From inigo...@apache.org
Subject [13/45] hadoop git commit: HADOOP-13421. Switch to v2 of the S3 List Objects API in S3A. Contributed by Aaron Fabbri
Date Fri, 08 Sep 2017 20:57:45 GMT
HADOOP-13421. Switch to v2 of the S3 List Objects API in S3A.
Contributed by Aaron Fabbri


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5bbca804
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5bbca804
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5bbca804

Branch: refs/heads/HDFS-10467
Commit: 5bbca80428ffbe776650652de86a3bba885edb31
Parents: ab8368d
Author: Steve Loughran <stevel@apache.org>
Authored: Fri Sep 8 12:07:02 2017 +0100
Committer: Steve Loughran <stevel@apache.org>
Committed: Fri Sep 8 12:07:02 2017 +0100

----------------------------------------------------------------------
 .../src/main/resources/core-default.xml         |   9 ++
 .../org/apache/hadoop/fs/s3a/Constants.java     |   9 ++
 .../fs/s3a/InconsistentAmazonS3Client.java      | 143 ++++++++++++++++---
 .../java/org/apache/hadoop/fs/s3a/Listing.java  |  22 +--
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  91 ++++++++----
 .../org/apache/hadoop/fs/s3a/S3ListRequest.java |  69 +++++++++
 .../org/apache/hadoop/fs/s3a/S3ListResult.java  |  97 +++++++++++++
 .../src/site/markdown/tools/hadoop-aws/index.md |   9 ++
 .../ITestS3AContractGetFileStatusV1List.java    |  59 ++++++++
 .../fs/s3a/ITestS3GuardListConsistency.java     |  22 +--
 .../hadoop/fs/s3a/TestS3AGetFileStatus.java     |  41 +++---
 11 files changed, 492 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 9e2c553..23739b0 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -1428,6 +1428,15 @@
   <description>The implementation class of the S3A AbstractFileSystem.</description>
 </property>
 
+<property>
+  <name>fs.s3a.list.version</name>
+  <value>2</value>
+  <description>
+    Select which version of the S3 SDK's List Objects API to use.  Currently
+    support 2 (default) and 1 (older API).
+  </description>
+</property>
+
 <!-- Azure file system properties -->
 <property>
   <name>fs.wasb.impl</name>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 1a464d0..4e2af3a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -451,4 +451,13 @@ public final class Constants {
   public static final String FAIL_INJECT_INCONSISTENCY_PROBABILITY =
       "fs.s3a.failinject.inconsistency.probability";
 
+  /**
+   * S3 API level parameters.
+   */
+  @InterfaceStability.Unstable
+  public static final String LIST_VERSION = "fs.s3a.list.version";
+
+  @InterfaceStability.Unstable
+  public static final int DEFAULT_LIST_VERSION = 2;
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index 5e9cb3f..6476f5d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -28,6 +28,8 @@ import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsResult;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
@@ -109,8 +111,10 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     }
   }
 
-  /** Map of key to delay -> time it was deleted + object summary (object
-   * summary is null for prefixes. */
+  /**
+   * Map of key to delay -> time it was deleted + object summary (object summary
+   * is null for prefixes.
+   */
   private Map<String, Delete> delayedDeletes = new HashMap<>();
 
   /** Map of key to delay -> time it was created. */
@@ -196,17 +200,29 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     return super.putObject(putObjectRequest);
   }
 
-  /* We should only need to override this version of listObjects() */
+  /* We should only need to override these versions of listObjects() */
   @Override
   public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
       throws AmazonClientException, AmazonServiceException {
     LOG.debug("prefix {}", listObjectsRequest.getPrefix());
     ObjectListing listing = super.listObjects(listObjectsRequest);
-    listing = filterListObjects(listObjectsRequest, listing);
+    listing = filterListObjects(listing);
     listing = restoreListObjects(listObjectsRequest, listing);
     return listing;
   }
 
+  /* We should only need to override these versions of listObjects() */
+  @Override
+  public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request)
+      throws AmazonClientException, AmazonServiceException {
+    LOG.debug("prefix {}", request.getPrefix());
+    ListObjectsV2Result listing = super.listObjectsV2(request);
+    listing = filterListObjectsV2(listing);
+    listing = restoreListObjectsV2(request, listing);
+    return listing;
+  }
+
+
   private void addSummaryIfNotPresent(List<S3ObjectSummary> list,
       S3ObjectSummary item) {
     // Behavior of S3ObjectSummary
@@ -282,21 +298,58 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     // recursive list has no delimiter, returns everything that matches a
     // prefix.
     boolean recursiveObjectList = !("/".equals(request.getDelimiter()));
+    String prefix = request.getPrefix();
+
+    restoreDeleted(outputList, outputPrefixes, recursiveObjectList, prefix);
+    return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+  }
+
+  /**
+   * V2 list API variant of
+   * {@link #restoreListObjects(ListObjectsRequest, ObjectListing)}.
+   * @param request original v2 list request
+   * @param result raw s3 result
+   */
+  private ListObjectsV2Result restoreListObjectsV2(ListObjectsV2Request request,
+      ListObjectsV2Result result) {
+    List<S3ObjectSummary> outputList = result.getObjectSummaries();
+    List<String> outputPrefixes = result.getCommonPrefixes();
+    // recursive list has no delimiter, returns everything that matches a
+    // prefix.
+    boolean recursiveObjectList = !("/".equals(request.getDelimiter()));
+    String prefix = request.getPrefix();
+
+    restoreDeleted(outputList, outputPrefixes, recursiveObjectList, prefix);
+    return new CustomListObjectsV2Result(result, outputList, outputPrefixes);
+  }
+
+
+  /**
+   * Main logic for
+   * {@link #restoreListObjects(ListObjectsRequest, ObjectListing)} and
+   * the v2 variant above.
+   * @param summaries object summary list to modify.
+   * @param prefixes prefix list to modify
+   * @param recursive true if recursive list request
+   * @param prefix prefix for original list request
+   */
+  private void restoreDeleted(List<S3ObjectSummary> summaries,
+      List<String> prefixes, boolean recursive, String prefix) {
 
     // Go through all deleted keys
     for (String key : new HashSet<>(delayedDeletes.keySet())) {
       Delete delete = delayedDeletes.get(key);
       if (isKeyDelayed(delete.time(), key)) {
-        if (isDescendant(request.getPrefix(), key, recursiveObjectList)) {
+        if (isDescendant(prefix, key, recursive)) {
           if (delete.summary() != null) {
-            addSummaryIfNotPresent(outputList, delete.summary());
+            addSummaryIfNotPresent(summaries, delete.summary());
           }
         }
         // Non-recursive list has delimiter: will return rolled-up prefixes for
         // all keys that are not direct children
-        if (!recursiveObjectList) {
-          if (isDescendant(request.getPrefix(), key, true)) {
-            addPrefixIfNotPresent(outputPrefixes, request.getPrefix(), key);
+        if (!recursive) {
+          if (isDescendant(prefix, key, true)) {
+            addPrefixIfNotPresent(prefixes, prefix, key);
           }
         }
       } else {
@@ -304,31 +357,52 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
         delayedDeletes.remove(key);
       }
     }
+  }
+
+  private ObjectListing filterListObjects(ObjectListing rawListing) {
+
+    // Filter object listing
+    List<S3ObjectSummary> outputList = filterSummaries(
+        rawListing.getObjectSummaries());
+
+    // Filter prefixes (directories)
+    List<String> outputPrefixes = filterPrefixes(
+        rawListing.getCommonPrefixes());
 
     return new CustomObjectListing(rawListing, outputList, outputPrefixes);
   }
 
-  private ObjectListing filterListObjects(ListObjectsRequest request,
-      ObjectListing rawListing) {
-
+  private ListObjectsV2Result filterListObjectsV2(ListObjectsV2Result raw) {
     // Filter object listing
+    List<S3ObjectSummary> outputList = filterSummaries(
+        raw.getObjectSummaries());
+
+    // Filter prefixes (directories)
+    List<String> outputPrefixes = filterPrefixes(raw.getCommonPrefixes());
+
+    return new CustomListObjectsV2Result(raw, outputList, outputPrefixes);
+  }
+
+  private List<S3ObjectSummary> filterSummaries(
+      List<S3ObjectSummary> summaries) {
     List<S3ObjectSummary> outputList = new ArrayList<>();
-    for (S3ObjectSummary s : rawListing.getObjectSummaries()) {
+    for (S3ObjectSummary s : summaries) {
       String key = s.getKey();
       if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
         outputList.add(s);
       }
     }
+    return outputList;
+  }
 
-    // Filter prefixes (directories)
+  private List<String> filterPrefixes(List<String> prefixes) {
     List<String> outputPrefixes = new ArrayList<>();
-    for (String key : rawListing.getCommonPrefixes()) {
+    for (String key : prefixes) {
       if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
         outputPrefixes.add(key);
       }
     }
-
-    return new CustomObjectListing(rawListing, outputList, outputPrefixes);
+    return outputPrefixes;
   }
 
   private boolean isKeyDelayed(Long enqueueTime, String key) {
@@ -342,7 +416,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
       delayedDeletes.remove(key);
       LOG.debug("no longer delaying {}", key);
       return false;
-    } else  {
+    } else {
       LOG.info("delaying {}", key);
       return true;
     }
@@ -431,4 +505,37 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
       return customPrefixes;
     }
   }
+
+  private static class CustomListObjectsV2Result extends ListObjectsV2Result {
+
+    private final List<S3ObjectSummary> customListing;
+    private final List<String> customPrefixes;
+
+    CustomListObjectsV2Result(ListObjectsV2Result raw,
+        List<S3ObjectSummary> customListing, List<String> customPrefixes) {
+      super();
+      this.customListing = customListing;
+      this.customPrefixes = customPrefixes;
+
+      this.setBucketName(raw.getBucketName());
+      this.setCommonPrefixes(raw.getCommonPrefixes());
+      this.setDelimiter(raw.getDelimiter());
+      this.setEncodingType(raw.getEncodingType());
+      this.setStartAfter(raw.getStartAfter());
+      this.setMaxKeys(raw.getMaxKeys());
+      this.setContinuationToken(raw.getContinuationToken());
+      this.setPrefix(raw.getPrefix());
+      this.setTruncated(raw.isTruncated());
+    }
+
+    @Override
+    public List<S3ObjectSummary> getObjectSummaries() {
+      return customListing;
+    }
+
+    @Override
+    public List<String> getCommonPrefixes() {
+      return customPrefixes;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index 8efa218..d9f059b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -19,8 +19,6 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.services.s3.model.ListObjectsRequest;
-import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileStatus;
@@ -90,7 +88,7 @@ public class Listing {
    */
   FileStatusListingIterator createFileStatusListingIterator(
       Path listPath,
-      ListObjectsRequest request,
+      S3ListRequest request,
       PathFilter filter,
       Listing.FileStatusAcceptor acceptor) throws IOException {
     return createFileStatusListingIterator(listPath, request, filter, acceptor,
@@ -112,7 +110,7 @@ public class Listing {
    */
   FileStatusListingIterator createFileStatusListingIterator(
       Path listPath,
-      ListObjectsRequest request,
+      S3ListRequest request,
       PathFilter filter,
       Listing.FileStatusAcceptor acceptor,
       RemoteIterator<FileStatus> providedStatus) throws IOException {
@@ -432,7 +430,7 @@ public class Listing {
      * @param objects the next object listing
      * @return true if this added any entries after filtering
      */
-    private boolean buildNextStatusBatch(ObjectListing objects) {
+    private boolean buildNextStatusBatch(S3ListResult objects) {
       // counters for debug logs
       int added = 0, ignored = 0;
       // list to fill in with results. Initial size will be list maximum.
@@ -512,13 +510,16 @@ public class Listing {
    *
    * Thread safety: none.
    */
-  class ObjectListingIterator implements RemoteIterator<ObjectListing> {
+  class ObjectListingIterator implements RemoteIterator<S3ListResult> {
 
     /** The path listed. */
     private final Path listPath;
 
     /** The most recent listing results. */
-    private ObjectListing objects;
+    private S3ListResult objects;
+
+    /** The most recent listing request. */
+    private S3ListRequest request;
 
     /** Indicator that this is the first listing. */
     private boolean firstListing = true;
@@ -542,10 +543,11 @@ public class Listing {
      * */
     ObjectListingIterator(
         Path listPath,
-        ListObjectsRequest request) {
+        S3ListRequest request) {
       this.listPath = listPath;
       this.maxKeys = owner.getMaxKeys();
       this.objects = owner.listObjects(request);
+      this.request = request;
     }
 
     /**
@@ -569,7 +571,7 @@ public class Listing {
      * @throws NoSuchElementException if there is no more data to list.
      */
     @Override
-    public ObjectListing next() throws IOException {
+    public S3ListResult next() throws IOException {
       if (firstListing) {
         // on the first listing, don't request more data.
         // Instead just clear the firstListing flag so that it future calls
@@ -585,7 +587,7 @@ public class Listing {
           // need to request a new set of objects.
           LOG.debug("[{}], Requesting next {} objects under {}",
               listingCount, maxKeys, listPath);
-          objects = owner.continueListObjects(objects);
+          objects = owner.continueListObjects(request, objects);
           listingCount++;
           LOG.debug("New listing status: {}", this);
         } catch (AmazonClientException e) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index c22383a..e76ef0b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -53,8 +53,8 @@ import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.MultiObjectDeleteException;
-import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PutObjectRequest;
@@ -167,6 +167,7 @@ public class S3AFileSystem extends FileSystem {
   private String blockOutputBuffer;
   private S3ADataBlocks.BlockFactory blockFactory;
   private int blockOutputActiveBlocks;
+  private boolean useListV1;
 
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
@@ -261,6 +262,13 @@ public class S3AFileSystem extends FileSystem {
           BlockingThreadPoolExecutorService.newDaemonThreadFactory(
               "s3a-transfer-unbounded"));
 
+      int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
+      if (listVersion < 1 || listVersion > 2) {
+        LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
+            "version 2", listVersion);
+      }
+      useListV1 = (listVersion == 1);
+
       initTransferManager();
 
       initCannedAcls(conf);
@@ -1056,21 +1064,37 @@ public class S3AFileSystem extends FileSystem {
    * @param request request to initiate
    * @return the results
    */
-  protected ObjectListing listObjects(ListObjectsRequest request) {
+  protected S3ListResult listObjects(S3ListRequest request) {
     incrementStatistic(OBJECT_LIST_REQUESTS);
     incrementReadOperations();
-    return s3.listObjects(request);
+    if (useListV1) {
+      Preconditions.checkArgument(request.isV1());
+      return S3ListResult.v1(s3.listObjects(request.getV1()));
+    } else {
+      Preconditions.checkArgument(!request.isV1());
+      return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
+    }
   }
 
   /**
    * List the next set of objects.
-   * @param objects paged result
+   * @param request last list objects request to continue
+   * @param prevResult last paged result to continue from
    * @return the next result object
    */
-  protected ObjectListing continueListObjects(ObjectListing objects) {
+  protected S3ListResult continueListObjects(S3ListRequest request,
+      S3ListResult prevResult) {
     incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS);
     incrementReadOperations();
-    return s3.listNextBatchOfObjects(objects);
+    if (useListV1) {
+      Preconditions.checkArgument(request.isV1());
+      return S3ListResult.v1(s3.listNextBatchOfObjects(prevResult.getV1()));
+    } else {
+      Preconditions.checkArgument(!request.isV1());
+      request.getV2().setContinuationToken(prevResult.getV2()
+          .getNextContinuationToken());
+      return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
+    }
   }
 
   /**
@@ -1464,9 +1488,9 @@ public class S3AFileSystem extends FileSystem {
       } else {
         LOG.debug("Getting objects for directory prefix {} to delete", key);
 
-        ListObjectsRequest request = createListObjectsRequest(key, null);
+        S3ListRequest request = createListObjectsRequest(key, null);
 
-        ObjectListing objects = listObjects(request);
+        S3ListResult objects = listObjects(request);
         List<DeleteObjectsRequest.KeyVersion> keys =
             new ArrayList<>(objects.getObjectSummaries().size());
         while (true) {
@@ -1481,7 +1505,7 @@ public class S3AFileSystem extends FileSystem {
           }
 
           if (objects.isTruncated()) {
-            objects = continueListObjects(objects);
+            objects = continueListObjects(request, objects);
           } else {
             if (!keys.isEmpty()) {
               // TODO: HADOOP-13761 S3Guard: retries
@@ -1589,7 +1613,7 @@ public class S3AFileSystem extends FileSystem {
         return S3Guard.dirMetaToStatuses(dirMeta);
       }
 
-      ListObjectsRequest request = createListObjectsRequest(key, "/");
+      S3ListRequest request = createListObjectsRequest(key, "/");
       LOG.debug("listStatus: doing listObjects for directory {}", key);
 
       Listing.FileStatusListingIterator files =
@@ -1619,16 +1643,38 @@ public class S3AFileSystem extends FileSystem {
    * @return the request
    */
   @VisibleForTesting
-  ListObjectsRequest createListObjectsRequest(String key,
+  S3ListRequest createListObjectsRequest(String key,
       String delimiter) {
-    ListObjectsRequest request = new ListObjectsRequest();
-    request.setBucketName(bucket);
-    request.setMaxKeys(maxKeys);
-    request.setPrefix(key);
-    if (delimiter != null) {
-      request.setDelimiter(delimiter);
+    return createListObjectsRequest(key, delimiter, null);
+  }
+
+  private S3ListRequest createListObjectsRequest(String key,
+      String delimiter, Integer overrideMaxKeys) {
+    if (!useListV1) {
+      ListObjectsV2Request request =
+          new ListObjectsV2Request().withBucketName(bucket)
+              .withMaxKeys(maxKeys)
+              .withPrefix(key);
+      if (delimiter != null) {
+        request.setDelimiter(delimiter);
+      }
+      if (overrideMaxKeys != null) {
+        request.setMaxKeys(overrideMaxKeys);
+      }
+      return S3ListRequest.v2(request);
+    } else {
+      ListObjectsRequest request = new ListObjectsRequest();
+      request.setBucketName(bucket);
+      request.setMaxKeys(maxKeys);
+      request.setPrefix(key);
+      if (delimiter != null) {
+        request.setDelimiter(delimiter);
+      }
+      if (overrideMaxKeys != null) {
+        request.setMaxKeys(overrideMaxKeys);
+      }
+      return S3ListRequest.v1(request);
     }
-    return request;
   }
 
   /**
@@ -1885,13 +1931,9 @@ public class S3AFileSystem extends FileSystem {
 
     try {
       key = maybeAddTrailingSlash(key);
-      ListObjectsRequest request = new ListObjectsRequest();
-      request.setBucketName(bucket);
-      request.setPrefix(key);
-      request.setDelimiter("/");
-      request.setMaxKeys(1);
+      S3ListRequest request = createListObjectsRequest(key, "/", 1);
 
-      ObjectListing objects = listObjects(request);
+      S3ListResult objects = listObjects(request);
 
       Collection<String> prefixes = objects.getCommonPrefixes();
       Collection<S3ObjectSummary> summaries = objects.getObjectSummaries();
@@ -2441,6 +2483,7 @@ public class S3AFileSystem extends FileSystem {
     }
     sb.append(", metastore=").append(metadataStore);
     sb.append(", authoritative=").append(allowAuthoritative);
+    sb.append(", useListV1=").append(useListV1);
     sb.append(", boundedExecutor=").append(boundedThreadPool);
     sb.append(", unboundedExecutor=").append(unboundedThreadPool);
     sb.append(", statistics {")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
new file mode 100644
index 0000000..6b3bd46
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListRequest.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+
+/**
+ * API version-independent container for S3 List requests.
+ */
+public class S3ListRequest {
+  private ListObjectsRequest v1Request;
+  private ListObjectsV2Request v2Request;
+
+  protected S3ListRequest(ListObjectsRequest v1, ListObjectsV2Request v2) {
+    v1Request = v1;
+    v2Request = v2;
+  }
+
+  /**
+   * Restricted constructors to ensure v1 or v2, not both.
+   * @param request v1 request
+   * @return new list request container
+   */
+  public static S3ListRequest v1(ListObjectsRequest request) {
+    return new S3ListRequest(request, null);
+  }
+
+  /**
+   * Restricted constructors to ensure v1 or v2, not both.
+   * @param request v2 request
+   * @return new list request container
+   */
+  public static S3ListRequest v2(ListObjectsV2Request request) {
+    return new S3ListRequest(null, request);
+  }
+
+  /**
+   * Is this a v1 API request or v2?
+   * @return true if v1, false if v2
+   */
+  public boolean isV1() {
+    return v1Request != null;
+  }
+
+  public ListObjectsRequest getV1() {
+    return v1Request;
+  }
+
+  public ListObjectsV2Request getV2() {
+    return v2Request;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
new file mode 100644
index 0000000..e8aff32
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ListResult.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
+
+import java.util.List;
+
+/**
+ * API version-independent container for S3 List responses.
+ */
+public class S3ListResult {
+  private ObjectListing v1Result;
+  private ListObjectsV2Result v2Result;
+
+  protected S3ListResult(ObjectListing v1, ListObjectsV2Result v2) {
+    v1Result = v1;
+    v2Result = v2;
+  }
+
+  /**
+   * Restricted constructors to ensure v1 or v2, not both.
+   * @param result v1 result
+   * @return new list result container
+   */
+  public static S3ListResult v1(ObjectListing result) {
+    return new S3ListResult(result, null);
+  }
+
+  /**
+   * Restricted constructors to ensure v1 or v2, not both.
+   * @param result v2 result
+   * @return new list result container
+   */
+  public static S3ListResult v2(ListObjectsV2Result result) {
+    return new S3ListResult(null, result);
+  }
+
+  /**
+   * Is this a v1 API result or v2?
+   * @return true if v1, false if v2
+   */
+  public boolean isV1() {
+    return v1Result != null;
+  }
+
+  public ObjectListing getV1() {
+    return v1Result;
+  }
+
+  public ListObjectsV2Result getV2() {
+    return v2Result;
+  }
+
+  public List<S3ObjectSummary> getObjectSummaries() {
+    if (isV1()) {
+      return v1Result.getObjectSummaries();
+    } else {
+      return v2Result.getObjectSummaries();
+    }
+  }
+
+  public boolean isTruncated() {
+    if (isV1()) {
+      return v1Result.isTruncated();
+    } else {
+      return v2Result.isTruncated();
+    }
+  }
+
+  public List<String> getCommonPrefixes() {
+    if (isV1()) {
+      return v1Result.getCommonPrefixes();
+    } else {
+      return v2Result.getCommonPrefixes();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index b8d37c6..ffae1e9 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -895,6 +895,15 @@ from placing its declaration on the command line.
       any call to setReadahead() is made to an open stream.</description>
     </property>
 
+    <property>
+      <name>fs.s3a.list.version</name>
+      <value>2</value>
+      <description>
+        Select which version of the S3 SDK's List Objects API to use.  Currently
+        support 2 (default) and 1 (older API).
+      </description>
+    </property>
+
 ### Configuring different S3 buckets
 
 Different S3 buckets can be accessed with different S3A client configurations.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java
new file mode 100644
index 0000000..5275336
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AContractGetFileStatusV1List.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+
+import static org.apache.hadoop.fs.s3a.Constants.LIST_VERSION;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.disableFilesystemCaching;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
+
+/**
+ * S3A contract tests for getFileStatus, using the v1 List Objects API.
+ */
+public class ITestS3AContractGetFileStatusV1List
+    extends AbstractContractGetFileStatusTest {
+
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    getLog().info("FS details {}", getFileSystem());
+    super.teardown();
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    disableFilesystemCaching(conf);
+    conf.setInt(Constants.MAX_PAGING_KEYS, 2);
+    maybeEnableS3Guard(conf);
+
+    // Use v1 List Objects API
+    conf.setInt(LIST_VERSION, 1);
+    return conf;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index 6cff533..da7699e 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import com.amazonaws.services.s3.model.ObjectListing;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.AmazonS3;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -488,6 +488,10 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase
{
 
   @Test
   public void testInconsistentS3ClientDeletes() throws Throwable {
+    // Test only implemented for v2 S3 list API
+    Assume.assumeTrue(getConfiguration()
+        .getInt(LIST_VERSION, DEFAULT_LIST_VERSION) == 2);
+
     S3AFileSystem fs = getFileSystem();
     Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING);
     for (int i = 0; i < 3; i++) {
@@ -502,17 +506,17 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase
{
     AmazonS3 client = fs.getAmazonS3Client();
     String key = fs.pathToKey(root) + "/";
 
-    ObjectListing preDeleteDelimited = client.listObjects(
-        fs.createListObjectsRequest(key, "/"));
-    ObjectListing preDeleteUndelimited = client.listObjects(
-        fs.createListObjectsRequest(key, null));
+    ListObjectsV2Result preDeleteDelimited = client.listObjectsV2(
+        fs.createListObjectsRequest(key, "/").getV2());
+    ListObjectsV2Result preDeleteUndelimited = client.listObjectsV2(
+        fs.createListObjectsRequest(key, null).getV2());
 
     fs.delete(root, true);
 
-    ObjectListing postDeleteDelimited = client.listObjects(
-        fs.createListObjectsRequest(key, "/"));
-    ObjectListing postDeleteUndelimited = client.listObjects(
-        fs.createListObjectsRequest(key, null));
+    ListObjectsV2Result postDeleteDelimited = client.listObjectsV2(
+        fs.createListObjectsRequest(key, "/").getV2());
+    ListObjectsV2Result postDeleteUndelimited = client.listObjectsV2(
+        fs.createListObjectsRequest(key, null).getV2());
 
     assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
             "in a non-recursive listing",

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5bbca804/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
index 58e4d30..586264d 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AGetFileStatus.java
@@ -25,9 +25,12 @@ import static org.mockito.Mockito.*;
 import java.io.FileNotFoundException;
 import java.util.Collections;
 import java.util.Date;
+import java.util.List;
 
 import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
+import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
@@ -93,12 +96,7 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
     when(s3.getObjectMetadata(argThat(
       correctGetMetadataRequest(BUCKET, key + "/"))
     )).thenThrow(NOT_FOUND);
-    ObjectListing objects = mock(ObjectListing.class);
-    when(objects.getCommonPrefixes()).thenReturn(
-        Collections.singletonList("dir/"));
-    when(objects.getObjectSummaries()).thenReturn(
-        Collections.<S3ObjectSummary>emptyList());
-    when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+    setupListMocks(Collections.singletonList("dir/"), Collections.emptyList());
     FileStatus stat = fs.getFileStatus(path);
     assertNotNull(stat);
     assertEquals(fs.makeQualified(path), stat.getPath());
@@ -118,12 +116,7 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
     when(s3.getObjectMetadata(argThat(
       correctGetMetadataRequest(BUCKET, key + "/")
     ))).thenThrow(NOT_FOUND);
-    ObjectListing objects = mock(ObjectListing.class);
-    when(objects.getCommonPrefixes()).thenReturn(
-        Collections.<String>emptyList());
-    when(objects.getObjectSummaries()).thenReturn(
-        Collections.<S3ObjectSummary>emptyList());
-    when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+    setupListMocks(Collections.emptyList(), Collections.emptyList());
     FileStatus stat = fs.getFileStatus(path);
     assertNotNull(stat);
     assertEquals(fs.makeQualified(path), stat.getPath());
@@ -140,16 +133,28 @@ public class TestS3AGetFileStatus extends AbstractS3AMockTest {
     when(s3.getObjectMetadata(argThat(
       correctGetMetadataRequest(BUCKET, key + "/")
     ))).thenThrow(NOT_FOUND);
-    ObjectListing objects = mock(ObjectListing.class);
-    when(objects.getCommonPrefixes()).thenReturn(
-        Collections.<String>emptyList());
-    when(objects.getObjectSummaries()).thenReturn(
-        Collections.<S3ObjectSummary>emptyList());
-    when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+    setupListMocks(Collections.emptyList(), Collections.emptyList());
     exception.expect(FileNotFoundException.class);
     fs.getFileStatus(path);
   }
 
+  private void setupListMocks(List<String> prefixes,
+      List<S3ObjectSummary> summaries) {
+
+    // V1 list API mock
+    ObjectListing objects = mock(ObjectListing.class);
+    when(objects.getCommonPrefixes()).thenReturn(prefixes);
+    when(objects.getObjectSummaries()).thenReturn(summaries);
+    when(s3.listObjects(any(ListObjectsRequest.class))).thenReturn(objects);
+
+    // V2 list API mock
+    ListObjectsV2Result v2Result = mock(ListObjectsV2Result.class);
+    when(v2Result.getCommonPrefixes()).thenReturn(prefixes);
+    when(v2Result.getObjectSummaries()).thenReturn(summaries);
+    when(s3.listObjectsV2(any(ListObjectsV2Request.class)))
+        .thenReturn(v2Result);
+  }
+
   private Matcher<GetObjectMetadataRequest> correctGetMetadataRequest(
       String bucket, String key) {
     return new BaseMatcher<GetObjectMetadataRequest>() {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message