hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [08/36] hadoop git commit: HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints. Contributed by Steve Loughran and Ryan Blue.
Date Tue, 28 Nov 2017 21:36:43 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
new file mode 100644
index 0000000..c6dbf55
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md
@@ -0,0 +1,819 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+
+   http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Committing work to S3 with the "S3A Committers"
+
+<!-- MACRO{toc|fromDepth=0|toDepth=5} -->
+
+This page covers the S3A Committers, which can commit work directly
+to an S3 object store.
+
+These committers are designed to solve a fundamental problem which
+the standard committers of work cannot do to S3: consistent, high performance,
+and reliable commitment of output to S3.
+
+For details on their internal design, see
+[S3A Committers: Architecture and Implementation](./committer_architecture.html).
+
+
+## Introduction: The Commit Problem
+
+
+Apache Hadoop MapReduce (and behind the scenes, Apache Spark) often write
+the output of their work to filesystems
+
+Normally, Hadoop uses the `FileOutputFormatCommitter` to manage the
+promotion of files created in a single task attempt to the final output of
+a query. This is done in a way to handle failures of tasks and jobs, and to
+support speculative execution. It does that by listing directories and renaming
+their content into the final destination when tasks and then jobs are committed.
+
+This has some key requirement of the underlying filesystem:
+
+1. When you list a directory, you see all the files which have been created in it,
+and no files which are not in it (i.e. have been deleted).
+1. When you rename a directory, it is an `O(1)` atomic transaction. No other
+process across the cluster may rename a file or directory to the same path.
+If the rename fails for any reason, either the data is at the original location,
+or it is at the destination, -in which case the rename actually succeeded.
+
+**The S3 object store and the `s3a://` filesystem client cannot meet these requirements.*
+
+1. Amazon S3 has inconsistent directory listings unless S3Guard is enabled.
+1. The S3A mimics `rename()` by copying files and then deleting the originals.
+This can fail partway through, and there is nothing to prevent any other process
+in the cluster attempting a rename at the same time.
+
+As a result,
+
+* Files my not be listed, hence not renamed into place.
+* Deleted files may still be discovered, confusing the rename process to the point
+of failure.
+* If a rename fails, the data is left in an unknown state.
+* If more than one process attempts to commit work simultaneously, the output
+directory may contain the results of both processes: it is no longer an exclusive
+operation.
+*. While S3Guard may deliver the listing consistency, commit time is still
+proportional to the amount of data created. It still can't handle task failure.
+
+**Using the "classic" `FileOutputCommmitter` to commit work to Amazon S3 risks
+loss or corruption of generated data**
+
+
+To address these problems there is now explicit support in the `hadop-aws`
+module for committing work to Amazon S3 via the S3A filesystem client,
+*the S3A Committers*
+
+
+For safe, as well as high-performance output of work to S3,
+we need use "a committer" explicitly written to work with S3, treating it as
+an object store with special features.
+
+
+### Background : Hadoop's "Commit Protocol"
+
+How exactly is work written to its final destination? That is accomplished by
+a "commit protocol" between the workers and the job manager.
+
+This protocol is implemented in Hadoop MapReduce, with a similar but extended
+version in Apache Spark:
+
+1. A "Job" is the entire query, with inputs to outputs
+1. The "Job Manager" is the process in charge of choreographing the execution
+of the job. It may perform some of the actual computation too.
+1. The job has "workers", which are processes which work the actual data
+and write the results.
+1. Workers execute "Tasks", which are fractions of the job, a job whose
+input has been *partitioned* into units of work which can be executed independently.
+1. The Job Manager directs workers to execute "tasks", usually trying to schedule
+the work close to the data (if the filesystem provides locality information).
+1. Workers can fail: the Job manager needs to detect this and reschedule their active tasks.
+1. Workers can also become separated from the Job Manager, a "network partition".
+It is (provably) impossible for the Job Manager to distinguish a running-but-unreachable
+worker from a failed one.
+1. The output of a failed task must not be visible; this is to avoid its
+data getting into the final output.
+1. Multiple workers can be instructed to evaluate the same partition of the work;
+this "speculation" delivers speedup as it can address the "straggler problem".
+When multiple workers are working on the same data, only one worker is allowed
+to write the final output.
+1. The entire job may fail (often from the failure of the Job Manager (MR Master, Spark Driver, ...)).
+1, The network may partition, with workers isolated from each other or
+the process managing the entire commit.
+1. Restarted jobs may recover from a failure by reusing the output of all
+completed tasks (MapReduce with the "v1" algorithm), or just by rerunning everything
+(The "v2" algorithm and Spark).
+
+
+What is "the commit protocol" then? It is the requirements on workers as to
+when their data is made visible, where, for a filesystem, "visible" means "can
+be seen in the destination directory of the query."
+
+* There is a destination directory of work, "the output directory."
+* The final output of tasks must be in this directory *or paths underneath it*.
+* The intermediate output of a task must not be visible in the destination directory.
+That is: they must not write directly to the destination.
+* The final output of a task *may* be visible under the destination.
+* The Job Manager makes the decision as to whether a task's data is to be "committed",
+be it directly to the final directory or to some intermediate store..
+* Individual workers communicate with the Job manager to manage the commit process:
+whether the output is to be *committed* or *aborted*
+* When a worker commits the output of a task, it somehow promotes its intermediate work to becoming
+final.
+* When a worker aborts a task's output, that output must not become visible
+(i.e. it is not committed).
+* Jobs themselves may be committed/aborted (the nature of "when" is not covered here).
+* After a Job is committed, all its work must be visible.
+* And a file `_SUCCESS` may be written to the output directory.
+* After a Job is aborted, all its intermediate data is lost.
+* Jobs may also fail. When restarted, the successor job must be able to clean up
+all the intermediate and committed work of its predecessor(s).
+* Task and Job processes measure the intervals between communications with their
+Application Master and YARN respectively.
+When the interval has grown too large they must conclude
+that the network has partitioned and that they must abort their work.
+
+
+That's "essentially" it. When working with HDFS and similar filesystems,
+directory `rename()` is the mechanism used to commit the work of tasks and
+jobs.
+* Tasks write data to task attempt directories under the directory `_temporary`
+underneath the final destination directory.
+* When a task is committed, these files are renamed to the destination directory
+(v2 algorithm) or a job attempt directory under `_temporary` (v1 algorithm).
+* When a job is committed, for the v2 algorithm the `_SUCCESS` file is created,
+and the `_temporary` deleted.
+* For the v1 algorithm, when a job is committed, all the tasks committed under
+the job attempt directory will have their output renamed into the destination
+directory.
+* The v2 algorithm recovers from failure by deleting the destination directory
+and restarting the job.
+* The v1 algorithm recovers from failure by discovering all committed tasks
+whose output is in the job attempt directory, *and only rerunning all uncommitted tasks*.
+
+
+None of this algorithm works safely or swiftly when working with "raw" AWS S3 storage:
+* Directory listing can be inconsistent: the tasks and jobs may not list all work to
+be committed.
+* Renames go from being fast, atomic operations to slow operations which can fail partway through.
+
+This then is the problem which the S3A committers address:
+
+*How to safely and reliably commit work to Amazon S3 or compatible object store*
+
+
+## Meet the S3A Commmitters
+
+Since Hadoop 3.1, the S3A FileSystem has been accompanied by classes
+designed to integrate with the Hadoop and Spark job commit protocols, classes
+which interact with the S3A filesystem to reliably commit work work to S3:
+*The S3A Committers*
+
+The underlying architecture of this process is very complex, and
+covered in [the committer architecture documentation](./committer_architecture.html).
+
+The key concept to know of is S3's "Multipart Upload" mechanism. This allows
+an S3 client to write data to S3 in multiple HTTP POST requests, only completing
+the write operation with a final POST to complete the upload; this final POST
+consisting of a short list of the etags of the uploaded blocks.
+This multipart upload mechanism is already automatically used when writing large
+amounts of data to S3; an implementation detail of the S3A output stream.
+
+The S3A committers make explicit use of this multipart upload ("MPU") mechanism:
+
+1. The individual *tasks* in a job write their data to S3 as POST operations
+within multipart uploads, yet do not issue the final POST to complete the upload.
+1. The multipart uploads are committed in the job commit process.
+
+There are two different S3A committer types, *staging*
+and *magic*. The committers primarily vary in how data is written during task execution,
+how  the pending commit information is passed to the job manager, and in how
+conflict with existing files is resolved.
+
+
+| feature | staging | magic |
+|--------|---------|---|
+| task output destination | local disk | S3A *without completing the write* |
+| task commit process | upload data from disk to S3 | list all pending uploads on s3 and write details to job attempt directory |
+| task abort process | delete local disk data | list all pending uploads and abort them |
+| job commit | list & complete pending uploads | list & complete pending uploads |
+
+The other metric is "maturity". There, the fact that the Staging committers
+are based on Netflix's production code counts in its favor.
+
+
+### The Staging Committer
+
+This is based on work from Netflix. It "stages" data into the local filesystem.
+It also requires the cluster to have HDFS, so that
+
+Tasks write to URLs with `file://` schemas. When a task is committed,
+its files are listed, uploaded to S3 as incompleted Multipart Uploads.
+The information needed to complete the uploads is saved to HDFS where
+it is committed through the standard "v1" commit algorithm.
+
+When the Job is committed, the Job Manager reads the lists of pending writes from its
+HDFS Job destination directory and completes those uploads.
+
+Cancelling a task is straighforward: the local directory is deleted with
+its staged data. Cancelling a job is achieved by reading in the lists of
+pending writes from the HDFS job attempt directory, and aborting those
+uploads. For extra safety, all outstanding multipart writes to the destination directory
+are aborted.
+
+The staging committer comes in two slightly different forms, with slightly
+diffrent conflict resolution policies:
+
+
+* **Directory**: the entire directory tree of data is written or overwritten,
+as normal.
+
+* **Partitioned**: special handling of partitioned directory trees of the form
+`YEAR=2017/MONTH=09/DAY=19`: conflict resolution is limited to the partitions
+being updated.
+
+
+The Partitioned Committer is intended to allow jobs updating a partitioned
+directory tree to restrict the conflict resolution to only those partition
+directories containing new data. It is intended for use with Apache Spark
+only.
+
+
+## Conflict Resolution in the Staging Committers
+
+The Staging committers offer the ability to replace the conflict policy
+of the execution engine with policy designed to work with the tree of data.
+This is based on the experience and needs of Netflix, where efficiently adding
+new data to an existing partitioned directory tree is a common operation.
+
+```xml
+<property>
+  <name>fs.s3a.committer.staging.conflict-mode</name>
+  <value>fail</value>
+  <description>
+    Staging committer conflict resolution policy: {@value}.
+    Supported: fail, append, replace.
+  </description>
+</property>
+```
+
+**replace** : when the job is committed (and not before), delete files in
+directories into which new data will be written.
+
+**fail**: when there are existing files in the destination, fail the job.
+
+**append**: Add new data to the directories at the destination; overwriting
+any with the same name. Reliable use requires unique names for generated files,
+which the committers generate
+by default.
+
+The difference between the two staging ommitters are as follows:
+
+The Directory Committer uses the entire directory tree for conflict resolution.
+If any file exists at the destination it will fail in job setup; if the resolution
+mechanism is "replace" then all existing files will be deleted.
+
+The partitioned committer calculates the partitions into which files are added,
+the final directories in the tree, and uses that in its conflict resolution
+process:
+
+
+**replace** : delete all data in the destination partition before committing
+the new files.
+
+**fail**: fail if there is data in the destination partition, ignoring the state
+of any parallel partitions.
+
+**append**: add the new data.
+
+It's intended for use in Apache Spark Dataset operations, rather
+than Hadoop's original MapReduce engine, and only in jobs
+where adding new data to an existing dataset is the desired goal.
+
+Preequisites for successful work
+
+1. The output is written into partitions via `PARTITIONED BY` or `partitionedBy()`
+instructions.
+2. There is no data written directly to the root path (all files there are
+ignored; it's implicitly "append").
+
+Here's an example in Spark, assuming that `sourceDataset` is a dataset
+whose columns include "year" and "month":
+
+```scala
+sourceDataset
+  .write
+  .partitionBy("year", "month")
+  .mode(SaveMode.Append)
+  .opt("fs.s3a.committer.name", "partitioned")
+  .opt("fs.s3a.committer.staging.conflict-mode", "replace")
+  .format("orc")
+  .save("s3a://examples/statistics")
+```
+
+
+### The Magic Committer
+
+The "Magic" committer does its work through "magic" in the filesystem:
+attempts to write to specific "magic" paths are interpreted as writes
+to a parent directory *which are not to be completed*. When the output stream
+is closed, the information needed to complete the write is saved in the magic
+directory. The task committer saves the list of these to a directory for the
+job committers use, or, if aborting, lists the pending writes and aborts them.
+
+The job committer reads in the list of pending commits, and commits them as
+the Staging Committer does.
+
+Compared to the Staging Committer, the Magic Committer offers faster write
+times: output is uploaded to S3 as it is written, rather than in the
+task commit.
+
+However, it has extra requirements of the filesystem
+
+1. It requires a consistent object store, which for Amazon S3,
+means that [S3Guard](./s3guard.html) must be enabled. For third-party stores,
+consult the documentation.
+1. The S3A client must be configured to recognize interactions
+with the magic directories and treat them specially.
+
+
+It's also not been field tested to the extent of Netflix's committer; consider
+it the least mature of the committers.
+
+
+#### Which Committer to Use?
+
+1. If you want to create or update existing partitioned data trees in Spark, use thee
+Partitioned Committer. Make sure you have enough hard disk capacity for all staged data.
+Do not use it in other situations.
+
+1. If you know that your object store is consistent, or that the processes
+writing data use S3Guard, use the Magic Committer for higher performance
+writing of large amounts of data.
+
+1. Otherwise: use the directory committer, making sure you have enough
+hard disk capacity for all staged data.
+
+Put differently: start with the Directory Committer.
+
+## Switching to an S3A Committer
+
+To use an S3A committer, the property `mapreduce.outputcommitter.factory.scheme.s3a`
+must be set to the S3A committer factory, `org.apache.hadoop.fs.s3a.commit.staging.S3ACommitterFactory`.
+This is done in `core-default.xml`
+
+```xml
+<property>
+  <name>mapreduce.outputcommitter.factory.scheme.s3a</name>
+  <value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value>
+  <description>
+    The committer factory to use when writing data to S3A filesystems.
+  </description>
+</property>
+```
+
+What is missing is an explicit choice of committer to use in the property
+`fs.s3a.committer.name`; so the classic (and unsafe) file committer is used.
+
+| `fs.s3a.committer.name` |  Committer |
+|--------|---------|
+| `directory` | directory staging committer |
+| `partitioned` | partition staging committer (for use in Spark only) |
+| `magic` | the "magic" committer |
+| `file` | the original and unsafe File committer; (default) |
+
+
+
+## Using the Directory and Partitioned Staging Committers
+
+Generated files are initially written to a local directory underneath one of the temporary
+directories listed in `fs.s3a.buffer.dir`.
+
+
+The staging commmitter needs a path in the cluster filesystem
+(e.g. HDFS). This must be declared in `fs.s3a.committer.tmp.path`.
+
+Temporary files are saved in HDFS (or other cluster filesystem) under the path
+`${fs.s3a.committer.tmp.path}/${user}` where `user` is the name of the user running the job.
+The default value of `fs.s3a.committer.tmp.path` is `tmp/staging`,
+Which will be converted at run time to a path under the current user's home directory,
+essentially `~/tmp/staging`
+ so the temporary directory
+
+The application attempt ID is used to create a unique path under this directory,
+resulting in a path `~/tmp/staging/${user}/${application-attempt-id}/` under which
+summary data of each task's pending commits are managed using the standard
+`FileOutputFormat` committer.
+
+When a task is committed the data is uploaded under the destination directory.
+The policy of how to react if the destination exists is defined by
+the `fs.s3a.committer.staging.conflict-mode` setting.
+
+| `fs.s3a.committer.staging.conflict-mode` | Meaning |
+| -----------------------------------------|---------|
+| `fail` | Fail if the destination directory exists |
+| `replace` | Delete all existing files before committing the new data |
+| `append` | Add the new files to the existing directory tree |
+
+
+## The "Partitioned" Staging Committer
+
+This committer an extension of the "Directory" committer which has a special conflict resolution
+policy designed to support operations which insert new data into a directory tree structured
+using Hive's partitioning strategy: different levels of the tree represent different columns.
+
+For example, log data could be partitioned by `YEAR` and then by `MONTH`, with different
+entries underneath.
+
+```
+logs/YEAR=2017/MONTH=01/
+  log-20170101.avro
+  log-20170102.avro
+  ...
+  log-20170131.avro
+
+logs/YEAR=2017/MONTH=02/
+  log-20170201.avro
+  log-20170202.avro
+  ...
+  log-20170227.avro
+
+logs/YEAR=2017/MONTH=03/
+logs/YEAR=2017/MONTH=04/
+```
+
+A partitioned structure like this allows for queries using Hive or Spark to filter out
+files which do not contain relevant data.
+
+What the partitioned committer does is, where the tooling permits, allows callers
+to add data to an existing partitioned layout*.
+
+More specifically, it does this by having a conflict resolution options which
+only act on invididual partitions, rather than across the entire output tree.
+
+| `fs.s3a.committer.staging.conflict-mode` | Meaning |
+| -----------------------------------------|---------|
+| `fail` | Fail if the destination partition(s) exist |
+| `replace` | Delete the existing data partitions before committing the new data |
+| `append` | Add the new data to the existing partitions |
+
+
+As an example, if a job was writing the file
+`logs/YEAR=2017/MONTH=02/log-20170228.avro`, then with a policy of `fail`,
+the job would fail. With a policy of `replace`, then entire directory
+`logs/YEAR=2017/MONTH=02/` would be deleted before the new file `log-20170228.avro`
+was written. With the policy of `append`, the new file would be added to
+the existing set of files.
+
+
+### Notes
+
+1. A deep partition tree can itself be a performance problem in S3 and the s3a client,
+or, more specifically. a problem with applications which use recursive directory tree
+walks to work with data.
+
+1. The outcome if you have more than one job trying simultaneously to write data
+to the same destination with any policy other than "append" is undefined.
+
+1. In the `append` operation, there is no check for conflict with file names.
+If, in the example above, the file `log-20170228.avro` already existed,
+it would be overridden. Set `fs.s3a.committer.staging.unique-filenames` to `true`
+to ensure that a UUID is included in every filename to avoid this.
+
+
+## Using the Magic committer
+
+This is less mature than the Staging Committer, but promises higher
+performance.
+
+### FileSystem client setup
+
+1. Use a *consistent* S3 object store. For Amazon S3, this means enabling
+[S3Guard](./s3guard.html). For S3-compatible filesystems, consult the filesystem
+documentation to see if it is consistent, hence compatible "out of the box".
+1. Turn the magic on by `fs.s3a.committer.magic.enabled"`
+
+```xml
+<property>
+  <name>fs.s3a.committer.magic.enabled</name>
+  <description>
+  Enable support in the filesystem for the S3 "Magic" committter.
+  </description>
+  <value>true</value>
+</property>
+```
+
+*Do not use the Magic Committer on an inconsistent S3 object store. For
+Amazon S3, that means S3Guard must *always* be enabled.
+
+
+### Enabling the committer
+
+```xml
+<property>
+  <name>fs.s3a.committer.name</name>
+  <value>magic</value>
+</property>
+
+```
+
+Conflict management is left to the execution engine itself.
+
+## Committer Configuration Options
+
+
+| Option | Magic | Directory | Partitioned | Meaning | Default |
+|--------|-------|-----------|-------------|---------|---------|
+| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a `_SUCCESS` file  at the end of each job | `true` |
+| `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for parallel operations on files. | 8 |
+| `fs.s3a.committer.staging.conflict-mode` |  | X | X | Conflict resolution: `fail`, `abort` or `overwrite`| `fail` |
+| `fs.s3a.committer.staging.unique-filenames` |  | X | X | Generate unique filenames | `true` |
+
+| `fs.s3a.committer.magic.enabled` | X |  | | Enable "magic committer" support in the filesystem | `false` |
+
+
+
+
+| Option | Magic | Directory | Partitioned | Meaning | Default |
+|--------|-------|-----------|-------------|---------|---------|
+| `fs.s3a.buffer.dir` | X | X | X | Local filesystem directory for data being written and/or staged. | |
+| `fs.s3a.committer.staging.tmp.path` |  | X | X | Path in the cluster filesystem for temporary data | `tmp/staging` |
+
+
+```xml
+<property>
+  <name>fs.s3a.committer.name</name>
+  <value>file</value>
+  <description>
+    Committer to create for output to S3A, one of:
+    "file", "directory", "partitioned", "magic".
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.magic.enabled</name>
+  <value>false</value>
+  <description>
+    Enable support in the filesystem for the S3 "Magic" committer.
+    When working with AWS S3, S3Guard must be enabled for the destination
+    bucket, as consistent metadata listings are required.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.threads</name>
+  <value>8</value>
+  <description>
+    Number of threads in committers for parallel operations on files
+    (upload, commit, abort, delete...)
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.staging.tmp.path</name>
+  <value>tmp/staging</value>
+  <description>
+    Path in the cluster filesystem for temporary data.
+    This is for HDFS, not the local filesystem.
+    It is only for the summary data of each file, not the actual
+    data being committed.
+    Using an unqualified path guarantees that the full path will be
+    generated relative to the home directory of the user creating the job,
+    hence private (assuming home directory permissions are secure).
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.staging.unique-filenames</name>
+  <value>true</value>
+  <description>
+    Option for final files to have a unique name through job attempt info,
+    or the value of fs.s3a.committer.staging.uuid
+    When writing data with the "append" conflict option, this guarantees
+    that new data will not overwrite any existing data.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.committer.staging.conflict-mode</name>
+  <value>fail</value>
+  <description>
+    Staging committer conflict resolution policy.
+    Supported: "fail", "append", "replace".
+  </description>
+</property>
+
+<property>
+  <name>s.s3a.committer.staging.abort.pending.uploads</name>
+  <value>true</value>
+  <description>
+    Should the staging committers abort all pending uploads to the destination
+    directory?
+
+    Changing this if more than one partitioned committer is
+    writing to the same destination tree simultaneously; otherwise
+    the first job to complete will cancel all outstanding uploads from the
+    others. However, it may lead to leaked outstanding uploads from failed
+    tasks. If disabled, configure the bucket lifecycle to remove uploads
+    after a time period, and/or set up a workflow to explicitly delete
+    entries. Otherwise there is a risk that uncommitted uploads may run up
+    bills.
+  </description>
+</property>
+
+<property>
+  <name>mapreduce.outputcommitter.factory.scheme.s3a</name>
+  <value>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</value>
+  <description>
+    The committer factory to use when writing data to S3A filesystems.
+    If mapreduce.outputcommitter.factory.class is set, it will
+    override this property.
+
+    (This property is set in mapred-default.xml)
+  </description>
+</property>
+
+```
+
+
+## Troubleshooting
+
+### `Filesystem does not have support for 'magic' committer`
+
+```
+org.apache.hadoop.fs.s3a.commit.PathCommitException: `s3a://landsat-pds': Filesystem does not have support for 'magic' committer enabled
+in configuration option fs.s3a.committer.magic.enabled
+```
+
+The Job is configured to use the magic committer, but the S3A bucket has not been explicitly
+declared as supporting it.
+
+The destination bucket **must** be declared as supporting the magic committer.
+
+This can be done for those buckets which are known to be consistent, either
+because [S3Guard](s3guard.html) is used to provide consistency,
+or because the S3-compatible filesystem is known to be strongly consistent.
+
+```xml
+<property>
+  <name>fs.s3a.bucket.landsat-pds.committer.magic.enabled</name>
+  <value>true</value>
+</property>
+```
+
+*IMPORTANT*: only enable the magic committer against object stores which
+offer consistent listings. By default, Amazon S3 does not do this -which is
+why the option `fs.s3a.committer.magic.enabled` is disabled by default.
+
+
+Tip: you can verify that a bucket supports the magic committer through the
+`hadoop s3guard bucket-info` command:
+
+
+```
+> hadoop s3guard bucket-info -magic s3a://landsat-pds/
+
+Filesystem s3a://landsat-pds
+Location: us-west-2
+Filesystem s3a://landsat-pds is not using S3Guard
+The "magic" committer is not supported
+
+S3A Client
+  Endpoint: fs.s3a.endpoint=(unset)
+  Encryption: fs.s3a.server-side-encryption-algorithm=none
+  Input seek policy: fs.s3a.experimental.input.fadvise=normal
+2017-09-27 19:18:57,917 INFO util.ExitUtil: Exiting with status 46: 46: The magic committer is not enabled for s3a://landsat-pds
+```
+
+## Error message: "File being created has a magic path, but the filesystem has magic file support disabled:
+
+A file is being written to a path which is used for "magic" files,
+files which are actually written to a different destination than their stated path
+*but the filesystem doesn't support "magic" files*
+
+This message should not appear through the committer itself &mdash;it will
+fail with the error message in the previous section, but may arise
+if other applications are attempting to create files under the path `/__magic/`.
+
+Make sure the filesytem meets the requirements of the magic committer
+(a consistent S3A filesystem through S3Guard or the S3 service itself),
+and set the `fs.s3a.committer.magic.enabled` flag to indicate that magic file
+writes are supported.
+
+
+### `FileOutputCommitter` appears to be still used (from logs or delays in commits)
+
+The Staging committers use the original `FileOutputCommitter` to manage
+the propagation of commit information: do not worry if it the logs show `FileOutputCommitter`
+work with data in the cluster filesystem (e.g. HDFS).
+
+One way to make sure that the `FileOutputCommitter` is not being used to write
+the data to S3 is to set the option `mapreduce.fileoutputcommitter.algorithm.version`
+to a value such as "10". Because the only supported algorithms are "1" and "2",
+any erroneously created `FileOutputCommitter` will raise an exception in its constructor
+when instantiated:
+
+```
+java.io.IOException: Only 1 or 2 algorithm version is supported
+at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:130)
+at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.<init>(FileOutputCommitter.java:104)
+at org.apache.parquet.hadoop.ParquetOutputCommitter.<init>(ParquetOutputCommitter.java:42)
+at org.apache.parquet.hadoop.ParquetOutputFormat.getOutputCommitter(ParquetOutputFormat.java:395)
+at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupCommitter(HadoopMapReduceCommitProtocol.scala:67)
+at com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol.setupCommitter(PathOutputCommitProtocol.scala:62)
+at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:124)
+at com.hortonworks.spark.cloud.commit.PathOutputCommitProtocol.setupJob(PathOutputCommitProtocol.scala:152)
+at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:175)
+at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:145)
+```
+
+While that will not make the problem go away, it will at least make
+the failure happen at the start of a job.
+
+(Setting this option will not interfer with the Staging Committers' use of HDFS,
+as it explicitly sets the algorithm to "2" for that part of its work).
+
+The other way to check which committer to use is to examine the `_SUCCESS` file.
+If it is 0-bytes long, the classic `FileOutputCommitter` committed the job.
+The S3A committers all write a non-empty JSON file; the `committer` field lists
+the committer used.
+
+
+*Common causes*
+
+1. The property `fs.s3a.committer.name` is set to "file". Fix: change.
+1. The job has overridden the property `mapreduce.outputcommitter.factory.class`
+with a new factory class for all committers. This takes priority over
+all committers registered for the s3a:// schema.
+1. The property `mapreduce.outputcommitter.factory.scheme.s3a` is unset.
+1. The output format has overridden `FileOutputFormat.getOutputCommitter()`
+and is returning its own committer -one which is a subclass of `FileOutputCommitter`.
+
+That final cause. *the output format is returning its own committer*, is not
+easily fixed; it may be that the custom committer performs critical work
+during its lifecycle, and contains assumptions about the state of the written
+data during task and job commit (i.e. it is in the destination filesystem).
+Consult with the authors/maintainers of the output format
+to see whether it would be possible to integrate with the new committer factory
+mechanism and object-store-specific commit algorithms.
+
+Parquet is a special case here: its committer does no extra work
+other than add the option to read all newly-created files then write a schema
+summary. The Spark integration has explicit handling for Parquet to enable it
+to support the new committers, removing this (slow on S3) option.
+
+If you have subclassed `FileOutputCommitter` and want to move to the
+factory model, please get in touch.
+
+
+## Job/Task fails with PathExistsException: Destination path exists and committer conflict resolution mode is "fail"
+
+This surfaces when either of two conditions are met.
+
+1. The Directory committer is used with `fs.s3a.committer.staging.conflict-mode` set to
+`fail` and the output/destination directory exists.
+The job will fail in the driver during job setup.
+1. The Partitioned Committer is used with `fs.s3a.committer.staging.conflict-mode` set to
+`fail`  and one of the partitions. The specific task(s) generating conflicting data will fail
+during task commit, which will cause the entire job to fail.
+
+If you are trying to write data and want write conflicts to be rejected, this is the correct
+behavior: there was data at the destination so the job was aborted.
+
+## Staging committer task fails with IOException: No space left on device
+
+There's not enough space on the local hard disk (real or virtual)
+to store all the uncommitted data of the active tasks on that host.
+Because the staging committers write all output to the local disk
+and only upload the data on task commits, enough local temporary
+storage is needed to store all output generated by all uncommitted
+tasks running on the single host. Small EC2 VMs may run out of disk.
+
+1. Make sure that `fs.s3a.buffer.dir` includes a temporary directory on
+every available hard disk; this spreads load better.
+
+1. Add more disk space. In EC2: request instances with more local storage.
+There is no need for EMR storage; this is just for temporary data.
+
+1. Purge the directories listed in `fs.s3a.buffer.dir` of old data.
+Failed tasks may not clean up all old files.
+
+1. Reduce the number of worker threads/process in the host.
+
+1. Consider partitioning the job into more tasks. This *may* result in more tasks
+generating less data each.
+
+1. Use the magic committer. This only needs enough disk storage to buffer
+blocks of the currently being written file during their upload process,
+so can use a lot less disk space.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index 75c638f..aaf9f23 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -28,6 +28,8 @@ See also:
 * [Encryption](./encryption.html)
 * [S3Guard](./s3guard.html)
 * [Troubleshooting](./troubleshooting_s3a.html)
+* [Committing work to S3 with the "S3A Committers"](./committers.html)
+* [S3A Committers Architecture](./committer_architecture.html)
 * [Testing](./testing.html)
 
 ##<a name="overview"></a> Overview
@@ -82,7 +84,7 @@ the Hadoop project itself.
 1. Amazon EMR's `s3://` client. This is from the Amazon EMR team, who actively
 maintain it.
 1. Apache's Hadoop's [`s3n:` filesystem client](./s3n.html).
-   This connectore is no longer available: users must migrate to the newer `s3a:` client.
+   This connector is no longer available: users must migrate to the newer `s3a:` client.
 
 
 ##<a name="getting_started"></a> Getting Started
@@ -177,6 +179,7 @@ Parts of Hadoop relying on this can have unexpected behaviour. E.g. the
 `AggregatedLogDeletionService` of YARN will not remove the appropriate logfiles.
 * Directory listing can be slow. Use `listFiles(path, recursive)` for high
 performance recursive listings whenever possible.
+* It is possible to create files under files if the caller tries hard.
 * The time to rename a directory is proportional to the number of files
 underneath it (directory or indirectly) and the size of the files. (The copyis
 executed inside the S3 storage, so the time is independent of the bandwidth
@@ -184,8 +187,13 @@ from client to S3).
 * Directory renames are not atomic: they can fail partway through, and callers
 cannot safely rely on atomic renames as part of a commit algorithm.
 * Directory deletion is not atomic and can fail partway through.
-* It is possible to create files under files if the caller tries hard.
 
+The final three issues surface when using S3 as the immediate destination
+of work, as opposed to HDFS or other "real" filesystem.
+
+The [S3A committers](./committers.html) are the sole mechanism available
+to safely save the output of queries directly into S3 object stores
+through the S3A filesystem.
 
 
 ### Warning #3: Object stores have differerent authorization models
@@ -223,18 +231,6 @@ Do not inadvertently share these credentials through means such as
 
 If you do any of these: change your credentials immediately!
 
-### Warning #5: The S3A client cannot be used on Amazon EMR
-
-On Amazon EMR `s3a://` URLs are not supported; Amazon provide
-their own filesystem client, `s3://`.
-If you are using Amazon EMR, follow their instructions for use —and be aware
-that all issues related to S3 integration in EMR can only be addressed by Amazon
-themselves: please raise your issues with them.
-
-Equally importantly: much of this document does not apply to the EMR `s3://` client.
-Pleae consult
-[the EMR storage documentation](http://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-plan-file-systems.html)
-instead.
 
 ## <a name="authenticating"></a> Authenticating with S3
 
@@ -616,7 +612,7 @@ Because the provider path is not itself a sensitive secret, there is no risk
 from placing its declaration on the command line.
 
 
-## <a name="general_configuration"></a>Genaral S3A Client configuration
+## <a name="general_configuration"></a>General S3A Client configuration
 
 All S3A client options are configured with options with the prefix `fs.s3a.`.
 
@@ -875,6 +871,166 @@ options are covered in [Testing](./testing.md).
 </property>
 ```
 
+## <a name="retry_and_recovery"></a>Retry and Recovery
+
+The S3A client makes a best-effort attempt at recovering from network failures;
+this section covers the details of what it does.
+
+The S3A divides exceptions returned by the AWS SDK into different categories,
+and chooses a differnt retry policy based on their type and whether or
+not the failing operation is idempotent.
+
+
+### Unrecoverable Problems: Fail Fast
+
+* No object/bucket store: `FileNotFoundException`
+* No access permissions: `AccessDeniedException`
+* Network errors considered unrecoverable (`UnknownHostException`,
+ `NoRouteToHostException`, `AWSRedirectException`).
+* Interruptions: `InterruptedIOException`, `InterruptedException`.
+* Rejected HTTP requests: `InvalidRequestException`
+
+These are all considered unrecoverable: S3A will make no attempt to recover
+from them.
+
+### Possibly Recoverable Problems: Retry
+
+* Connection timeout: `ConnectTimeoutException`. Timeout before
+setting up a connection to the S3 endpoint (or proxy).
+* HTTP response status code 400, "Bad Request"
+
+The status code 400, Bad Request usually means that the request
+is unrecoverable; it's the generic "No" response. Very rarely it
+does recover, which is why it is in this category, rather than that
+of unrecoverable failures.
+
+These failures will be retried with a fixed sleep interval set in
+`fs.s3a.retry.interval`, up to the limit set in `fs.s3a.retry.limit`.
+
+
+### Only retrible on idempotent operations
+
+Some network failures are considered to be retriable if they occur on
+idempotent operations; there's no way to know if they happened
+after the request was processed by S3.
+
+* `SocketTimeoutException`: general network failure.
+* `EOFException` : the connection was broken while reading data
+* "No response from Server" (443, 444) HTTP responses.
+* Any other AWS client, service or S3 exception.
+
+These failures will be retried with a fixed sleep interval set in
+`fs.s3a.retry.interval`, up to the limit set in `fs.s3a.retry.limit`.
+
+*Important*: DELETE is considered idempotent, hence: `FileSystem.delete()`
+and `FileSystem.rename()` will retry their delete requests on any
+of these failures.
+
+The issue of whether delete should be idempotent has been a source
+of historical controversy in Hadoop.
+
+1. In the absence of any other changes to the object store, a repeated
+DELETE request will eventually result in the named object being deleted;
+it's a no-op if reprocessed. As indeed, is `Filesystem.delete()`.
+1. If another client creates a file under the path, it will be deleted.
+1. Any filesystem supporting an atomic `FileSystem.create(path, overwrite=false)`
+operation to reject file creation if the path exists MUST NOT consider
+delete to be idempotent, because a `create(path, false)` operation will
+only succeed if the first `delete()` call has already succeded.
+1. And a second, retried `delete()` call could delete the new data.
+
+Because S3 is eventially consistent *and* doesn't support an
+atomic create-no-overwrite operation, the choice is more ambigious.
+
+Currently S3A considers delete to be
+idempotent because it is convenient for many workflows, including the
+commit protocols. Just be aware that in the presence of transient failures,
+more things may be deleted than expected. (For anyone who considers this to
+be the wrong decision: rebuild the `hadoop-aws` module with the constant
+`S3AFileSystem.DELETE_CONSIDERED_IDEMPOTENT` set to `false`).
+
+
+
+
+
+
+### Throttled requests from S3 and Dynamo DB
+
+
+When S3A or Dynamo DB returns a response indicating that requests
+from the caller are being throttled, an exponential back-off with
+an initial interval and a maximum number of requests.
+
+```xml
+<property>
+  <name>fs.s3a.retry.throttle.limit</name>
+  <value>${fs.s3a.attempts.maximum}</value>
+  <description>
+    Number of times to retry any throttled request.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.retry.throttle.interval</name>
+  <value>1000ms</value>
+  <description>
+    Interval between retry attempts on throttled requests.
+  </description>
+</property>
+```
+
+Notes
+
+1. There is also throttling taking place inside the AWS SDK; this is managed
+by the value `fs.s3a.attempts.maximum`.
+1. Throttling events are tracked in the S3A filesystem metrics and statistics.
+1. Amazon KMS may thottle a customer based on the total rate of uses of
+KMS *across all user accounts and applications*.
+
+Throttling of S3 requests is all too common; it is caused by too many clients
+trying to access the same shard of S3 Storage. This generatlly
+happen if there are too many reads, those being the most common in Hadoop
+applications. This problem is exacerbated by Hive's partitioning
+strategy used when storing data, such as partitioning by year and then month.
+This results in paths with little or no variation at their start, which ends
+up in all the data being stored in the same shard(s).
+
+Here are some expensive operations; the more of these taking place
+against part of an S3 bucket, the more load it experiences.
+* Many clients trying to list directories or calling `getFileStatus` on
+paths (LIST and HEAD requests respectively)
+* The GET requests issued when reading data.
+* Random IO used when reading columnar data (ORC, Parquet) means that many
+more GET requests than a simple one-per-file read.
+* The number of active writes to that part of the S3 bucket.
+
+A special case is when enough data has been written into part of an S3 bucket
+that S3 decides to split the data across more than one shard: this
+is believed to be one by some copy operation which can take some time.
+While this is under way, S3 clients access data under these paths will
+be throttled more than usual.
+
+
+Mitigation strategies
+
+1. Use separate buckets for intermediate data/different applications/roles.
+1. Use significantly different paths for different datasets in the same bucket.
+1. Increase the value of `fs.s3a.retry.throttle.interval` to provide
+longer delays between attempts.
+1. Reduce the parallelism of the queries. The more tasks trying to access
+data in parallel, the more load.
+1. Reduce `fs.s3a.threads.max` to reduce the amount of parallel operations
+performed by clients.
+!. Maybe: increase `fs.s3a.readahead.range` to increase the minimum amount
+of data asked for in every GET request, as well as how much data is
+skipped in the existing stream before aborting it and creating a new stream.
+1. If the DynamoDB tables used by S3Guard are being throttled, increase
+the capacity through `hadoop s3guard set-capacity` (and pay more, obviously).
+1. KMS: "consult AWS about increating your capacity".
+
+
+
+
 ## <a name="per_bucket_configuration"></a>Configuring different S3 buckets with Per-Bucket Configuration
 
 Different S3 buckets can be accessed with different S3A client configurations.
@@ -1081,7 +1237,7 @@ The original S3A client implemented file writes by
 buffering all data to disk as it was written to the `OutputStream`.
 Only when the stream's `close()` method was called would the upload start.
 
-This can made output slow, especially on large uploads, and could even
+This made output slow, especially on large uploads, and could even
 fill up the disk space of small (virtual) disks.
 
 Hadoop 2.7 added the `S3AFastOutputStream` alternative, which Hadoop 2.8 expanded.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
index 893f4ed..e2cb549 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md
@@ -426,7 +426,7 @@ hadoop s3guard diff s3a://ireland-1
 Prints and optionally checks the s3guard and encryption status of a bucket.
 
 ```bash
-hadoop s3guard bucket-info [ -guarded ] [-unguarded] [-auth] [-nonauth] [-encryption ENCRYPTION] s3a://BUCKET
+hadoop s3guard bucket-info [ -guarded ] [-unguarded] [-auth] [-nonauth] [-magic] [-encryption ENCRYPTION] s3a://BUCKET
 ```
 
 Options
@@ -437,6 +437,7 @@ Options
 | `-unguarded` | Require S3Guard to be disabled |
 | `-auth` | Require the S3Guard mode to be "authoritative" |
 | `-nonauth` | Require the S3Guard mode to be "non-authoritative" |
+| `-magic` | Require the S3 filesystem to be support the "magic" committer |
 | `-encryption <type>` | Require a specific server-side encryption algorithm  |
 
 The server side encryption options are not directly related to S3Guard, but
@@ -445,10 +446,11 @@ it is often convenient to check them at the same time.
 Example
 
 ```bash
-hadoop s3guard bucket-info -guarded s3a://ireland-1
+hadoop s3guard bucket-info -guarded -magic s3a://ireland-1
 ```
 
 List the details of bucket `s3a://ireland-1`, mandating that it must have S3Guard enabled
+("-guarded") and that support for the magic S3A committer is enabled ("-magic")
 
 ```
 Filesystem s3a://ireland-1
@@ -476,6 +478,7 @@ Metadata Store Diagnostics:
     TableSizeBytes: 12812,ItemCount: 91,
     TableArn: arn:aws:dynamodb:eu-west-1:00000000:table/ireland-1,}
   write-capacity=20
+The "magic" committer is supported
 
 S3A Client
   Endpoint: fs.s3a.endpoint=s3-eu-west-1.amazonaws.com

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
index cf7a2e4..75f8fa6 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/testing.md
@@ -629,6 +629,10 @@ Don't assume AWS S3 US-East only, do allow for working with external S3 implemen
 Those may be behind the latest S3 API features, not support encryption, session
 APIs, etc.
 
+They won't have the same CSV test files as some of the input tests rely on.
+Look at `ITestS3AInputStreamPerformance` to see how tests can be written
+to support the declaration of a specific large test file on alternate filesystems.
+
 
 ### Works Over Long-haul Links
 
@@ -662,6 +666,37 @@ listings, file status, ...), so help make failures easier to understand.
 At the very least, do not use `assertTrue()` or `assertFalse()` without
 including error messages.
 
+### Sets up its filesystem and checks for those settings
+
+Tests can overrun `createConfiguration()` to add new options to the configuration
+file for the S3A Filesystem instance used in their tests.
+
+However, filesystem caching may mean that a test suite may get a cached
+instance created with an differennnt configuration. For tests which don't need
+specific configurations caching is good: it reduces test setup time.
+
+For those tests which do need unique options (encryption, magic files),
+things can break, and they will do so in hard-to-replicate ways.
+
+Use `S3ATestUtils.disableFilesystemCaching(conf)` to disable caching when
+modifying the config. As an example from `AbstractTestS3AEncryption`:
+
+```java
+@Override
+protected Configuration createConfiguration() {
+  Configuration conf = super.createConfiguration();
+  S3ATestUtils.disableFilesystemCaching(conf);
+  conf.set(Constants.SERVER_SIDE_ENCRYPTION_ALGORITHM,
+          getSSEAlgorithm().getMethod());
+  return conf;
+}
+```
+
+Then verify in the setup method or test cases that their filesystem actually has
+the desired feature (`fs.getConf().getProperty(...)`). This not only
+catches filesystem reuse problems, it catches the situation where the
+filesystem configuration in `auth-keys.xml` has explicit per-bucket settings
+which override the test suite's general option settings.
 
 ### Cleans Up Afterwards
 
@@ -677,6 +712,31 @@ get called.
 
 We really appreciate this &mdash; you will too.
 
+### Runs in parallel unless this is unworkable.
+
+Tests must be designed to run in parallel with other tests, all working
+with the same shared S3 bucket. This means
+
+* Uses relative and JVM-fork-unique paths provided by the method
+  `AbstractFSContractTestBase.path(String filepath)`.
+* Doesn't manipulate the root directory or make assertions about its contents
+(for example: delete its contents and assert that it is now empty).
+* Doesn't have a specific requirement of all active clients of the bucket
+(example: SSE-C tests which require all files, even directory markers,
+to be encrypted with the same key).
+* Doesn't use so much bandwidth that all other tests will be starved of IO and
+start timing out (e.g. the scale tests).
+
+Tests such as these can only be run as sequential tests. When adding one,
+exclude it in the POM file. from the parallel failsafe run and add to the
+sequential one afterwards. The IO heavy ones must also be subclasses of
+`S3AScaleTestBase` and so only run if the system/maven property
+`fs.s3a.scale.test.enabled` is true.
+
+## Individual test cases can be run in an IDE
+
+This is invaluable for debugging test failures.
+
 
 ## <a name="tips"></a> Tips
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
index 619ffc1..7d16744 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
@@ -721,3 +721,127 @@ http.headers (LoggingManagedHttpClientConnection.java:onResponseReceived(127)) -
 http.headers (LoggingManagedHttpClientConnection.java:onResponseReceived(127)) - http-outgoing-0 << Server: AmazonS3
 execchain.MainClientExec (MainClientExec.java:execute(284)) - Connection can be kept alive for 60000 MILLISECONDS
 ```
+
+
+## <a name="retries"></a>  Reducing failures by configuring retry policy
+
+The S3A client can ba configured to rety those operations which are considered
+retriable. That can be because they are idempotent, or
+because there failure happened before the request was processed by S3.
+
+The number of retries and interval between each retry can be configured:
+
+```xml
+<property>
+  <name>fs.s3a.attempts.maximum</name>
+  <value>20</value>
+  <description>How many times we should retry commands on transient errors,
+  excluding throttling errors.</description>
+</property>
+
+<property>
+  <name>fs.s3a.retry.interval</name>
+  <value>500ms</value>
+  <description>
+    Interval between retry attempts.
+  </description>
+</property>
+```
+
+Not all failures are retried. Specifically excluded are those considered
+unrecoverable:
+
+* Low-level networking: `UnknownHostException`, `NoRouteToHostException`.
+* 302 redirects
+* Missing resources, 404/`FileNotFoundException`
+* HTTP 416 response/`EOFException`. This can surface if the length of a file changes
+  while another client is reading it.
+* Failures during execution or result processing of non-idempotent operations where
+it is considered likely that the operation has already taken place.
+
+In future, others may be added to this list.
+
+When one of these failures arises in the S3/S3A client, the retry mechanism
+is bypassed and the operation will fail.
+
+*Warning*: the S3A client considers DELETE, PUT and COPY operations to
+be idempotent, and will retry them on failure. These are only really idempotent
+if no other client is attempting to manipulate the same objects, such as:
+renaming() the directory tree or uploading files to the same location.
+Please don't do that. Given that the emulated directory rename and delete operations
+aren't atomic, even without retries, multiple S3 clients working with the same
+paths can interfere with each other
+
+#### <a name="retries"></a> Throttling
+
+When many requests are made of a specific S3 bucket (or shard inside it),
+S3 will respond with a 503 "throttled" response.
+Throttling can be recovered from, provided overall load decreases.
+Furthermore, because it is sent before any changes are made to the object store,
+is inherently idempotent. For this reason, the client will always attempt to
+retry throttled requests.
+
+The limit of the number of times a throttled request can be retried,
+and the exponential interval increase between attempts, can be configured
+independently of the other retry limits.
+
+```xml
+<property>
+  <name>fs.s3a.retry.throttle.limit</name>
+  <value>20</value>
+  <description>
+    Number of times to retry any throttled request.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3a.retry.throttle.interval</name>
+  <value>500ms</value>
+  <description>
+    Interval between retry attempts on throttled requests.
+  </description>
+</property>
+```
+
+If a client is failing due to `AWSServiceThrottledException` failures,
+increasing the interval and limit *may* address this. However, it
+it is a sign of AWS services being overloaded by the sheer number of clients
+and rate of requests. Spreading data across different buckets, and/or using
+a more balanced directory structure may be beneficial.
+Consult [the AWS documentation](http://docs.aws.amazon.com/AmazonS3/latest/dev/request-rate-perf-considerations.html).
+
+Reading or writing data encrypted with SSE-KMS forces S3 to make calls of
+the AWS KMS Key Management Service, which comes with its own
+[Request Rate Limits](http://docs.aws.amazon.com/kms/latest/developerguide/limits.html).
+These default to 1200/second for an account, across all keys and all uses of
+them, which, for S3 means: across all buckets with data encrypted with SSE-KMS.
+
+###### Tips to Keep Throttling down
+
+* If you are seeing a lot of throttling responses on a large scale
+operation like a `distcp` copy, *reduce* the number of processes trying
+to work with the bucket (for distcp: reduce the number of mappers with the
+`-m` option).
+
+* If you are reading or writing lists of files, if you can randomize
+the list so they are not processed in a simple sorted order, you may
+reduce load on a specific shard of S3 data, so potentially increase throughput.
+
+* An S3 Bucket is throttled by requests coming from all
+simultaneous clients. Different applications and jobs may interfere with
+each other: consider that when troubleshooting.
+Partitioning data into different buckets may help isolate load here.
+
+* If you are using data encrypted with SSE-KMS, then the
+will also apply: these are stricter than the S3 numbers.
+If you believe that you are reaching these limits, you may be able to
+get them increased.
+Consult [the KMS Rate Limit documentation](http://docs.aws.amazon.com/kms/latest/developerguide/limits.html).
+
+* S3Guard uses DynamoDB for directory and file lookups;
+it is rate limited to the amount of (guaranteed) IO purchased for a
+table. If significant throttling events/rate is observed here, the preallocated
+IOPs can be increased with the `s3guard set-capacity` command, or
+through the AWS Console. Throttling events in S3Guard are noted in logs, and
+also in the S3A metrics `s3guard_metadatastore_throttle_rate` and
+`s3guard_metadatastore_throttled`.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractGetFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractGetFileStatus.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractGetFileStatus.java
index cb9819c..c2dc700 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractGetFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractGetFileStatus.java
@@ -21,12 +21,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
 import org.apache.hadoop.fs.contract.AbstractContractGetFileStatusTest;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3ATestConstants;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
 
 /**
  * S3A contract tests covering getFileStatus.
+ * Some of the tests can take too long when the fault injection rate is high,
+ * so the test timeout is extended.
  */
 public class ITestS3AContractGetFileStatus
     extends AbstractContractGetFileStatusTest {
@@ -52,4 +55,12 @@ public class ITestS3AContractGetFileStatus
     maybeEnableS3Guard(conf);
     return conf;
   }
+
+  /**
+   * {@inheritDoc}
+   * @return S3A test timeout.
+   */
+  protected int getTestTimeoutMillis() {
+    return S3ATestConstants.S3A_TEST_TIMEOUT;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
index 0c7f7df..03c91e6 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3AMockTest.java
@@ -26,6 +26,7 @@ import com.amazonaws.services.s3.AmazonS3;
 import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
 import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
 
@@ -62,10 +63,12 @@ public abstract class AbstractS3AMockTest {
     // test we don't issue request to AWS DynamoDB service.
     conf.setClass(S3_METADATA_STORE_IMPL, NullMetadataStore.class,
         MetadataStore.class);
+    // FS is always magic
+    conf.setBoolean(CommitConstants.MAGIC_COMMITTER_ENABLED, true);
     fs = new S3AFileSystem();
     URI uri = URI.create(FS_S3A + "://" + BUCKET);
     fs.initialize(uri, conf);
-    s3 = fs.getAmazonS3Client();
+    s3 = fs.getAmazonS3ClientForTesting("mocking");
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
index f0c389d..73e71f4 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/AbstractS3ATestBase.java
@@ -29,11 +29,13 @@ import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.maybeEnableS3Guard;
+import static org.apache.hadoop.fs.s3a.commit.CommitConstants.MAGIC_COMMITTER_ENABLED;
 
 /**
  * An extension of the contract test base set up for S3A tests.
@@ -58,7 +60,11 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
 
   @Before
   public void nameThread() {
-    Thread.currentThread().setName("JUnit-" + methodName.getMethodName());
+    Thread.currentThread().setName("JUnit-" + getMethodName());
+  }
+
+  protected String getMethodName() {
+    return methodName.getMethodName();
   }
 
   @Override
@@ -75,6 +81,19 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
     Configuration conf = super.createConfiguration();
     // patch in S3Guard options
     maybeEnableS3Guard(conf);
+    // set hadoop temp dir to a default value
+    String testUniqueForkId =
+        System.getProperty(TEST_UNIQUE_FORK_ID);
+    String tmpDir = conf.get(Constants.HADOOP_TMP_DIR, "target/build/test");
+    if (testUniqueForkId != null) {
+      // patch temp dir for the specific branch
+      tmpDir = tmpDir + File.pathSeparatorChar + testUniqueForkId;
+      conf.set(Constants.HADOOP_TMP_DIR, tmpDir);
+    }
+    conf.set(Constants.BUFFER_DIR, tmpDir);
+    // add this so that even on tests where the FS is shared,
+    // the FS is always "magic"
+    conf.setBoolean(MAGIC_COMMITTER_ENABLED, true);
     return conf;
   }
 
@@ -98,7 +117,7 @@ public abstract class AbstractS3ATestBase extends AbstractFSContractTestBase
    */
   protected void describe(String text, Object... args) {
     LOG.info("\n\n{}: {}\n",
-        methodName.getMethodName(),
+        getMethodName(),
         String.format(text, args));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
index b38f191..f1799ac 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
@@ -110,7 +110,7 @@ public class ITestS3AConfiguration {
     } else {
       conf.set(Constants.ENDPOINT, endpoint);
       fs = S3ATestUtils.createTestFileSystem(conf);
-      AmazonS3 s3 = fs.getAmazonS3Client();
+      AmazonS3 s3 = fs.getAmazonS3ClientForTesting("test endpoint");
       String endPointRegion = "";
       // Differentiate handling of "s3-" and "s3." based endpoint identifiers
       String[] endpointParts = StringUtils.split(endpoint, '.');
@@ -378,7 +378,7 @@ public class ITestS3AConfiguration {
     try {
       fs = S3ATestUtils.createTestFileSystem(conf);
       assertNotNull(fs);
-      AmazonS3 s3 = fs.getAmazonS3Client();
+      AmazonS3 s3 = fs.getAmazonS3ClientForTesting("configuration");
       assertNotNull(s3);
       S3ClientOptions clientOptions = getField(s3, S3ClientOptions.class,
           "clientOptions");
@@ -402,7 +402,7 @@ public class ITestS3AConfiguration {
     conf = new Configuration();
     fs = S3ATestUtils.createTestFileSystem(conf);
     assertNotNull(fs);
-    AmazonS3 s3 = fs.getAmazonS3Client();
+    AmazonS3 s3 = fs.getAmazonS3ClientForTesting("User Agent");
     assertNotNull(s3);
     ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
         "clientConfiguration");
@@ -416,7 +416,7 @@ public class ITestS3AConfiguration {
     conf.set(Constants.USER_AGENT_PREFIX, "MyApp");
     fs = S3ATestUtils.createTestFileSystem(conf);
     assertNotNull(fs);
-    AmazonS3 s3 = fs.getAmazonS3Client();
+    AmazonS3 s3 = fs.getAmazonS3ClientForTesting("User agent");
     assertNotNull(s3);
     ClientConfiguration awsConf = getField(s3, ClientConfiguration.class,
         "clientConfiguration");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
index 8b7e031..bac1bb1 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEncryptionSSEC.java
@@ -222,11 +222,10 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
     //unencrypted can access until the final directory
     unencryptedFileSystem.listFiles(pathA, true);
     unencryptedFileSystem.listFiles(pathAB, true);
-    AWSS3IOException ex = intercept(AWSS3IOException.class,
+    AWSBadRequestException ex = intercept(AWSBadRequestException.class,
         () -> {
           unencryptedFileSystem.listFiles(pathABC, false);
         });
-    assertStatusCode(ex, 400);
   }
 
   /**
@@ -270,11 +269,10 @@ public class ITestS3AEncryptionSSEC extends AbstractTestS3AEncryption {
     unencryptedFileSystem.listStatus(pathA);
     unencryptedFileSystem.listStatus(pathAB);
 
-    AWSS3IOException ex = intercept(AWSS3IOException.class,
+    intercept(AWSBadRequestException.class,
         () -> {
           unencryptedFileSystem.listStatus(pathABC);
         });
-    assertStatusCode(ex, 400);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
index 7d03668..5cd7379 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFailureHandling.java
@@ -23,9 +23,7 @@ import com.amazonaws.services.s3.model.MultiObjectDeleteException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.test.LambdaTestUtils;
 
 import org.junit.Assume;
 import org.junit.Test;
@@ -34,11 +32,12 @@ import org.slf4j.LoggerFactory;
 
 import java.io.EOFException;
 import java.io.FileNotFoundException;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.Callable;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.test.LambdaTestUtils.*;
 
 /**
  * Test S3A Failure translation, including a functional test
@@ -73,13 +72,9 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
       writeDataset(fs, testpath, shortDataset, shortDataset.length, 1024, true);
       // here the file length is less. Probe the file to see if this is true,
       // with a spin and wait
-      LambdaTestUtils.eventually(30 * 1000, 1000,
-          new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-              assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
-              return null;
-            }
+      eventually(30 * 1000, 1000,
+          () -> {
+            assertEquals(shortLen, fs.getFileStatus(testpath).getLen());
           });
 
       // here length is shorter. Assuming it has propagated to all replicas,
@@ -99,12 +94,8 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
           instream.read(instream.getPos(), buf, 0, buf.length));
 
       // now do a block read fully, again, backwards from the current pos
-      try {
-        instream.readFully(shortLen + 512, buf);
-        fail("Expected readFully to fail");
-      } catch (EOFException expected) {
-        LOG.debug("Expected EOF: ", expected);
-      }
+      intercept(EOFException.class, "", "readfully",
+          () -> instream.readFully(shortLen + 512, buf));
 
       assertIsEOF("read(offset)",
           instream.read(shortLen + 510, buf, 0, buf.length));
@@ -115,19 +106,10 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
       // delete the file. Reads must fail
       fs.delete(testpath, false);
 
-      try {
-        int r = instream.read();
-        fail("Expected an exception, got " + r);
-      } catch (FileNotFoundException e) {
-        // expected
-      }
-
-      try {
-        instream.readFully(2048, buf);
-        fail("Expected readFully to fail");
-      } catch (FileNotFoundException e) {
-        // expected
-      }
+      intercept(FileNotFoundException.class, "", "read()",
+          () -> instream.read());
+      intercept(FileNotFoundException.class, "", "readfully",
+          () -> instream.readFully(2048, buf));
 
     }
   }
@@ -149,7 +131,7 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
   }
 
   private void removeKeys(S3AFileSystem fileSystem, String... keys)
-      throws InvalidRequestException {
+      throws IOException {
     List<DeleteObjectsRequest.KeyVersion> request = new ArrayList<>(
         keys.length);
     for (String key : keys) {
@@ -168,14 +150,15 @@ public class ITestS3AFailureHandling extends AbstractS3ATestBase {
     timer.end("removeKeys");
   }
 
-  @Test(expected = MultiObjectDeleteException.class)
+  @Test
   public void testMultiObjectDeleteNoPermissions() throws Throwable {
     Configuration conf = getConfiguration();
     String csvFile = conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
     Assume.assumeTrue("CSV test file is not the default",
         DEFAULT_CSVTEST_FILE.equals(csvFile));
     Path testFile = new Path(csvFile);
-    S3AFileSystem fs = (S3AFileSystem)FileSystem.newInstance(testFile.toUri(), conf);
-    removeKeys(fs, fs.pathToKey(testFile));
+    S3AFileSystem fs = (S3AFileSystem)testFile.getFileSystem(conf);
+    intercept(MultiObjectDeleteException.class,
+        () -> removeKeys(fs, fs.pathToKey(testFile)));
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
index 3e293f7..e56fdf8 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
@@ -203,6 +203,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     // before the internal behavior w/ or w/o metadata store.
     assumeFalse(fs.hasMetadataStore());
 
+    skipDuringFaultInjection(fs);
+
     Path srcBaseDir = path("src");
     mkdirs(srcBaseDir);
     MetricDiff deleteRequests =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
index 4abe2b7..44a2beb 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ATemporaryCredentials.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.fs.s3a;
 import java.io.IOException;
 import java.net.URI;
 
-import com.amazonaws.auth.AWSCredentials;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClient;
 import com.amazonaws.services.securitytoken.model.GetSessionTokenRequest;
@@ -30,6 +29,7 @@ import com.amazonaws.services.securitytoken.model.Credentials;
 
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.LambdaTestUtils;
 
 import org.junit.Test;
 
@@ -113,7 +113,7 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase {
       createAndVerifyFile(fs, path("testSTSInvalidToken"), TEST_FILE_SIZE);
       fail("Expected an access exception, but file access to "
           + fs.getUri() + " was allowed: " + fs);
-    } catch (AWSS3IOException ex) {
+    } catch (AWSS3IOException | AWSBadRequestException ex) {
       LOG.info("Expected Exception: {}", ex.toString());
       LOG.debug("Expected Exception: {}", ex, ex);
     }
@@ -125,14 +125,7 @@ public class ITestS3ATemporaryCredentials extends AbstractS3ATestBase {
     conf.set(ACCESS_KEY, "accesskey");
     conf.set(SECRET_KEY, "secretkey");
     conf.set(SESSION_TOKEN, "");
-    TemporaryAWSCredentialsProvider provider
-        = new TemporaryAWSCredentialsProvider(conf);
-    try {
-      AWSCredentials credentials = provider.getCredentials();
-      fail("Expected a CredentialInitializationException,"
-          + " got " + credentials);
-    } catch (CredentialInitializationException expected) {
-      // expected
-    }
+    LambdaTestUtils.intercept(CredentialInitializationException.class,
+        () -> new TemporaryAWSCredentialsProvider(conf).getCredentials());
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
index bc03a17..4a81374 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
 import com.amazonaws.services.s3.AmazonS3;
 import org.apache.hadoop.conf.Configuration;
@@ -52,6 +53,16 @@ import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
  */
 public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
 
+  private Invoker invoker;
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    invoker = new Invoker(new S3ARetryPolicy(getConfiguration()),
+        Invoker.NO_OP
+    );
+  }
+
   @Override
   protected AbstractFSContract createContract(Configuration conf) {
     conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
@@ -503,20 +514,15 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     }
     clearInconsistency(fs);
 
-    AmazonS3 client = fs.getAmazonS3Client();
     String key = fs.pathToKey(root) + "/";
 
-    ListObjectsV2Result preDeleteDelimited = client.listObjectsV2(
-        fs.createListObjectsRequest(key, "/").getV2());
-    ListObjectsV2Result preDeleteUndelimited = client.listObjectsV2(
-        fs.createListObjectsRequest(key, null).getV2());
+    ListObjectsV2Result preDeleteDelimited = listObjectsV2(fs, key, "/");
+    ListObjectsV2Result preDeleteUndelimited = listObjectsV2(fs, key, null);
 
     fs.delete(root, true);
 
-    ListObjectsV2Result postDeleteDelimited = client.listObjectsV2(
-        fs.createListObjectsRequest(key, "/").getV2());
-    ListObjectsV2Result postDeleteUndelimited = client.listObjectsV2(
-        fs.createListObjectsRequest(key, null).getV2());
+    ListObjectsV2Result postDeleteDelimited = listObjectsV2(fs, key, "/");
+    ListObjectsV2Result postDeleteUndelimited = listObjectsV2(fs, key, null);
 
     assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
             "in a non-recursive listing",
@@ -540,8 +546,27 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     );
   }
 
+  /**
+   * retrying v2 list.
+   * @param fs
+   * @param key
+   * @param delimiter
+   * @return
+   * @throws IOException
+   */
+
+  private ListObjectsV2Result listObjectsV2(S3AFileSystem fs,
+      String key, String delimiter) throws java.io.IOException {
+    ListObjectsV2Request k = fs.createListObjectsRequest(key, delimiter)
+        .getV2();
+    return invoker.retryUntranslated("list", true,
+        () -> {
+          return fs.getAmazonS3ClientForTesting("list").listObjectsV2(k);
+        });
+  }
+
   private static void clearInconsistency(S3AFileSystem fs) throws Exception {
-    AmazonS3 s3 = fs.getAmazonS3Client();
+    AmazonS3 s3 = fs.getAmazonS3ClientForTesting("s3guard");
     InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
     ic.clearInconsistency();
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message