hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [15/36] hadoop git commit: HADOOP-13786 Add S3A committer for zero-rename commits to S3 endpoints. Contributed by Steve Loughran and Ryan Blue.
Date Tue, 28 Nov 2017 21:36:50 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java
new file mode 100644
index 0000000..13e1c61
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPathOutputCommitterFactory.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.lib.output;
+
+import java.io.IOException;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+
+import static org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory.*;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test the committer factory logic, looking at the override
+ * and fallback behavior.
+ */
+@SuppressWarnings("unchecked")
+public class TestPathOutputCommitterFactory extends Assert {
+
+  private static final String HTTP_COMMITTER_FACTORY = String.format(
+      COMMITTER_FACTORY_SCHEME_PATTERN, "http");
+
+  private static final Path HTTP_PATH = new Path("http://hadoop.apache.org/");
+  private static final Path HDFS_PATH = new Path("hdfs://localhost:8081/");
+
+  private TaskAttemptID taskAttemptID =
+      new TaskAttemptID("local", 0, TaskType.MAP, 1, 2);
+
+  /**
+   * Set a factory for a schema, verify it works.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testCommitterFactoryForSchema() throws Throwable {
+    createCommitterFactory(SimpleCommitterFactory.class,
+        HTTP_PATH,
+        newBondedConfiguration());
+  }
+
+  /**
+   * A schema factory only affects that filesystem.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testCommitterFactoryFallbackDefault() throws Throwable {
+    createCommitterFactory(FileOutputCommitterFactory.class,
+        HDFS_PATH,
+        newBondedConfiguration());
+  }
+
+  /**
+   * A schema factory only affects that filesystem; test through
+   * {@link PathOutputCommitterFactory#createCommitter(Path, TaskAttemptContext)}.
+   * @throws Throwable failure
+   */
+  @Test
+  public void testCommitterFallbackDefault() throws Throwable {
+    createCommitter(FileOutputCommitter.class,
+        HDFS_PATH,
+        taskAttempt(newBondedConfiguration()));
+  }
+
+  /**
+   * Verify that you can override any schema with an explicit name.
+   */
+  @Test
+  public void testCommitterFactoryOverride() throws Throwable {
+    Configuration conf = newBondedConfiguration();
+    // set up for the schema factory
+    // and then set a global one which overrides the others.
+    conf.set(COMMITTER_FACTORY_CLASS, OtherFactory.class.getName());
+    createCommitterFactory(OtherFactory.class, HDFS_PATH, conf);
+    createCommitterFactory(OtherFactory.class, HTTP_PATH, conf);
+  }
+
+  /**
+   * Verify that if the factory class option is "", schema factory
+   * resolution still works.
+   */
+  @Test
+  public void testCommitterFactoryEmptyOption() throws Throwable {
+    Configuration conf = newBondedConfiguration();
+    // set up for the schema factory
+    // and then set a global one which overrides the others.
+    conf.set(COMMITTER_FACTORY_CLASS, "");
+    createCommitterFactory(SimpleCommitterFactory.class, HTTP_PATH, conf);
+
+    // and HDFS, with no schema, falls back to the default
+    createCommitterFactory(FileOutputCommitterFactory.class, HDFS_PATH, conf);
+  }
+
+  /**
+   * Verify that if the committer factory class is unknown, you cannot
+   * create committers.
+   */
+  @Test
+  public void testCommitterFactoryUnknown() throws Throwable {
+    Configuration conf = new Configuration();
+    // set the factory to an unknown class
+    conf.set(COMMITTER_FACTORY_CLASS, "unknown");
+    intercept(RuntimeException.class,
+        () -> getCommitterFactory(HDFS_PATH, conf));
+  }
+
+  /**
+   * Verify that if the committer output path is null, you get back
+   * a FileOutputCommitter with null output & work paths.
+   */
+  @Test
+  public void testCommitterNullOutputPath() throws Throwable {
+    // bind http to schema
+    Configuration conf = newBondedConfiguration();
+    // then ask committers for a null path
+    FileOutputCommitter committer = createCommitter(
+        FileOutputCommitterFactory.class,
+        FileOutputCommitter.class,
+        null, conf);
+    assertNull(committer.getOutputPath());
+    assertNull(committer.getWorkPath());
+  }
+
+  /**
+   * Verify that if you explicitly name a committer, that takes priority
+   * over any filesystem committer.
+   */
+  @Test
+  public void testNamedCommitterFactory() throws Throwable {
+    Configuration conf = new Configuration();
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+    SimpleCommitter sc = createCommitter(
+        NamedCommitterFactory.class,
+        SimpleCommitter.class, HDFS_PATH, conf);
+    assertEquals("Wrong output path from " + sc,
+        HDFS_PATH,
+        sc.getOutputPath());
+  }
+
+  /**
+   * Verify that if you explicitly name a committer and there's no
+   * path, the committer is picked up.
+   */
+  @Test
+  public void testNamedCommitterFactoryNullPath() throws Throwable {
+    Configuration conf = new Configuration();
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+    SimpleCommitter sc = createCommitter(
+        NamedCommitterFactory.class,
+        SimpleCommitter.class,
+        null, conf);
+    assertNull(sc.getOutputPath());
+  }
+
+  /**
+   * Verify that if you explicitly name a committer and there's no
+   * path, the committer is picked up.
+   */
+  @Test
+  public void testNamedCommitterNullPath() throws Throwable {
+    Configuration conf = new Configuration();
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+
+    SimpleCommitter sc = createCommitter(
+        SimpleCommitter.class,
+        null, taskAttempt(conf));
+    assertNull(sc.getOutputPath());
+  }
+
+  /**
+   * Create a factory then a committer, validating the type of both.
+   * @param <T> type of factory
+   * @param <U> type of committer
+   * @param factoryClass expected factory class
+   * @param committerClass expected committer class
+   * @param path output path (may be null)
+   * @param conf configuration
+   * @return the committer
+   * @throws IOException failure to create
+   */
+  private <T extends PathOutputCommitterFactory, U extends PathOutputCommitter>
+      U createCommitter(Class<T> factoryClass,
+      Class<U> committerClass,
+      Path path,
+      Configuration conf) throws IOException {
+    T f = createCommitterFactory(factoryClass, path, conf);
+    PathOutputCommitter committer = f.createOutputCommitter(path,
+        taskAttempt(conf));
+    assertEquals(" Wrong committer for path " + path + " from factory " + f,
+        committerClass, committer.getClass());
+    return (U) committer;
+  }
+
+  /**
+   * Create a committer from a task context, via
+   * {@link PathOutputCommitterFactory#createCommitter(Path, TaskAttemptContext)}.
+   * @param <U> type of committer
+   * @param committerClass expected committer class
+   * @param path output path (may be null)
+   * @param context task attempt context
+   * @return the committer
+   * @throws IOException failure to create
+   */
+  private <U extends PathOutputCommitter> U createCommitter(
+      Class<U> committerClass,
+      Path path,
+      TaskAttemptContext context) throws IOException {
+    PathOutputCommitter committer = PathOutputCommitterFactory
+        .createCommitter(path, context);
+    assertEquals(" Wrong committer for path " + path,
+        committerClass, committer.getClass());
+    return (U) committer;
+  }
+
+  /**
+   * Create a factory then a committer, validating its type.
+   * @param factoryClass expected factory class
+   * @param path output path (may be null)
+   * @param conf configuration
+   * @param <T> type of factory
+   * @return the factory
+   */
+  private <T extends PathOutputCommitterFactory> T createCommitterFactory(
+      Class<T> factoryClass,
+      Path path,
+      Configuration conf) {
+    PathOutputCommitterFactory factory = getCommitterFactory(path, conf);
+    assertEquals(" Wrong factory for path " + path,
+        factoryClass, factory.getClass());
+    return (T)factory;
+  }
+
+  /**
+   * Create a new task attempt context.
+   * @param conf config
+   * @return a new context
+   */
+  private TaskAttemptContext taskAttempt(Configuration conf) {
+    return new TaskAttemptContextImpl(conf, taskAttemptID);
+  }
+
+  /**
+   * Verify that if you explicitly name a committer, that takes priority
+   * over any filesystem committer.
+   */
+  @Test
+  public void testFileOutputCommitterFactory() throws Throwable {
+    Configuration conf = new Configuration();
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, FILE_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+    getCommitterFactory(HDFS_PATH, conf);
+    createCommitter(
+        FileOutputCommitterFactory.class,
+        FileOutputCommitter.class, null, conf);
+  }
+
+  /**
+   * Follow the entire committer chain down and create a new committer from
+   * the output format.
+   * @throws Throwable on a failure.
+   */
+  @Test
+  public void testFileOutputFormatBinding() throws Throwable {
+    Configuration conf = newBondedConfiguration();
+    conf.set(FileOutputFormat.OUTDIR, HTTP_PATH.toUri().toString());
+    TextOutputFormat<String, String> off = new TextOutputFormat<>();
+    SimpleCommitter committer = (SimpleCommitter)
+        off.getOutputCommitter(taskAttempt(conf));
+    assertEquals("Wrong output path from "+ committer,
+        HTTP_PATH,
+        committer.getOutputPath());
+  }
+
+  /**
+   * Follow the entire committer chain down and create a new committer from
+   * the output format.
+   * @throws Throwable on a failure.
+   */
+  @Test
+  public void testFileOutputFormatBindingNoPath() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.unset(FileOutputFormat.OUTDIR);
+    // set up for the schema factory
+    conf.set(COMMITTER_FACTORY_CLASS, NAMED_COMMITTER_FACTORY);
+    conf.set(NAMED_COMMITTER_CLASS, SimpleCommitter.class.getName());
+    httpToSimpleFactory(conf);
+    TextOutputFormat<String, String> off = new TextOutputFormat<>();
+    SimpleCommitter committer = (SimpleCommitter)
+        off.getOutputCommitter(taskAttempt(conf));
+    assertNull("Output path from "+ committer,
+        committer.getOutputPath());
+  }
+
+  /**
+   * Bind the http schema CommitterFactory to {@link SimpleCommitterFactory}.
+   * @param conf config to patch
+   */
+  private Configuration httpToSimpleFactory(Configuration conf) {
+    conf.set(HTTP_COMMITTER_FACTORY, SimpleCommitterFactory.class.getName());
+    return conf;
+  }
+
+
+  /**
+   * Create a configuration with the http schema bonded to the simple factory.
+   * @return a new, patched configuration
+   */
+  private Configuration newBondedConfiguration() {
+    return httpToSimpleFactory(new Configuration());
+  }
+
+  /**
+   * Extract the (mandatory) cause of an exception.
+   * @param ex exception
+   * @param clazz expected class
+   * @return the cause, which will be of the expected type
+   * @throws AssertionError if there is a problem
+   */
+  private <E extends Throwable> E verifyCauseClass(Throwable ex,
+      Class<E> clazz) throws AssertionError {
+    Throwable cause = ex.getCause();
+    if (cause == null) {
+      throw new AssertionError("No cause", ex);
+    }
+    if (!cause.getClass().equals(clazz)) {
+      throw new AssertionError("Wrong cause class", cause);
+    }
+    return (E)cause;
+  }
+
+  @Test
+  public void testBadCommitterFactory() throws Throwable {
+    expectFactoryConstructionFailure(HTTP_COMMITTER_FACTORY);
+  }
+
+  @Test
+  public void testBoundCommitterWithSchema() throws Throwable {
+    // this verifies that a bound committer relays to the underlying committer
+    Configuration conf = newBondedConfiguration();
+    TestPathOutputCommitter.TaskContext tac
+        = new TestPathOutputCommitter.TaskContext(conf);
+    BindingPathOutputCommitter committer
+        = new BindingPathOutputCommitter(HTTP_PATH, tac);
+    intercept(IOException.class, "setupJob",
+        () -> committer.setupJob(tac));
+  }
+
+  @Test
+  public void testBoundCommitterWithDefault() throws Throwable {
+    // this verifies that a bound committer relays to the underlying committer
+    Configuration conf = newBondedConfiguration();
+    TestPathOutputCommitter.TaskContext tac
+        = new TestPathOutputCommitter.TaskContext(conf);
+    BindingPathOutputCommitter committer
+        = new BindingPathOutputCommitter(HDFS_PATH, tac);
+    assertEquals(FileOutputCommitter.class,
+        committer.getCommitter().getClass());
+  }
+
+  /**
+   * Set the specific key to a string which is not a factory class; expect
+   * a failure.
+   * @param key key to set
+   * @throws Throwable on a failure
+   */
+  @SuppressWarnings("ThrowableNotThrown")
+  protected void expectFactoryConstructionFailure(String key) throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(key, "Not a factory");
+    RuntimeException ex = intercept(RuntimeException.class,
+        () -> getCommitterFactory(HTTP_PATH, conf));
+    verifyCauseClass(
+        verifyCauseClass(ex, RuntimeException.class),
+        ClassNotFoundException.class);
+  }
+
+  /**
+   * A simple committer.
+   */
+  public static final class SimpleCommitter extends PathOutputCommitter {
+
+    private final Path outputPath;
+
+    public SimpleCommitter(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      super(outputPath, context);
+      this.outputPath = outputPath;
+    }
+
+    @Override
+    public Path getWorkPath() throws IOException {
+      return null;
+    }
+
+    /**
+     * Job setup throws an exception.
+     * @param jobContext Context of the job
+     * @throws IOException always
+     */
+    @Override
+    public void setupJob(JobContext jobContext) throws IOException {
+      throw new IOException("setupJob");
+    }
+
+    @Override
+    public void setupTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public boolean needsTaskCommit(TaskAttemptContext taskContext)
+        throws IOException {
+      return false;
+    }
+
+    @Override
+    public void commitTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public void abortTask(TaskAttemptContext taskContext) throws IOException {
+
+    }
+
+    @Override
+    public Path getOutputPath() {
+      return outputPath;
+    }
+  }
+
+  /**
+   * The simple committer factory.
+   */
+  private static class SimpleCommitterFactory
+      extends PathOutputCommitterFactory {
+
+    @Override
+    public PathOutputCommitter createOutputCommitter(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      return new SimpleCommitter(outputPath, context);
+    }
+
+  }
+
+  /**
+   * Some other factory.
+   */
+  private static class OtherFactory extends PathOutputCommitterFactory {
+
+    /**
+     * {@inheritDoc}
+     * @param outputPath output path. This may be null.
+     * @param context context
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public PathOutputCommitter createOutputCommitter(Path outputPath,
+        TaskAttemptContext context) throws IOException {
+      return new SimpleCommitter(outputPath, context);
+    }
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/pom.xml b/hadoop-tools/hadoop-aws/pom.xml
index 97ceddf..748537c 100644
--- a/hadoop-tools/hadoop-aws/pom.xml
+++ b/hadoop-tools/hadoop-aws/pom.xml
@@ -129,7 +129,9 @@
                 <!-- surefire.forkNumber won't do the parameter -->
                 <!-- substitution.  Putting a prefix in front of it like -->
                 <!-- "fork-" makes it work. -->
-                <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                <!-- Important: Those leading 0s are needed to guarantee that -->
+                <!-- trailing three chars are always numeric and unique -->
+                <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                 <!-- Propagate scale parameters -->
                 <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
                 <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
@@ -165,7 +167,7 @@
                     <!-- surefire.forkNumber won't do the parameter -->
                     <!-- substitution.  Putting a prefix in front of it like -->
                     <!-- "fork-" makes it work. -->
-                    <test.unique.fork.id>fork-${surefire.forkNumber}</test.unique.fork.id>
+                    <test.unique.fork.id>fork-000${surefire.forkNumber}</test.unique.fork.id>
                     <!-- Propagate scale parameters -->
                     <fs.s3a.scale.test.enabled>${fs.s3a.scale.test.enabled}</fs.s3a.scale.test.enabled>
                     <fs.s3a.scale.test.huge.filesize>${fs.s3a.scale.test.huge.filesize}</fs.s3a.scale.test.huge.filesize>
@@ -192,7 +194,6 @@
                     <include>**/ITest*.java</include>
                   </includes>
                   <excludes>
-                    <exclude>**/ITestJets3tNativeS3FileSystemContract.java</exclude>
                     <exclude>**/ITestS3AContractRootDir.java</exclude>
                     <exclude>**/ITestS3AFileContextStatistics.java</exclude>
                     <exclude>**/ITestS3AEncryptionSSEC*.java</exclude>
@@ -225,7 +226,6 @@
                   <!-- Do a sequential run for tests that cannot handle -->
                   <!-- parallel execution. -->
                   <includes>
-                    <include>**/ITestJets3tNativeS3FileSystemContract.java</include>
                     <include>**/ITestS3AContractRootDir.java</include>
                     <include>**/ITestS3AFileContextStatistics.java</include>
                     <include>**/ITestS3AHuge*.java</include>
@@ -465,8 +465,8 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-      <scope>test</scope>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -476,12 +476,23 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+       <artifactId>hadoop-mapreduce-client-hs</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-examples</artifactId>
       <scope>test</scope>
       <type>jar</type>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-distcp</artifactId>
       <scope>test</scope>
     </dependency>
@@ -491,5 +502,28 @@
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
+    <!-- artifacts needed to bring up a Mini MR Yarn cluster-->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-app</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-app</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+      <scope>test</scope>
+      <type>test-jar</type>
+    </dependency>
   </dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSBadRequestException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSBadRequestException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSBadRequestException.java
new file mode 100644
index 0000000..482c5a1
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSBadRequestException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonServiceException;
+
+/**
+ * A 400 "Bad Request" exception was received.
+ * This is the general "bad parameters, headers, whatever" failure.
+ */
+public class AWSBadRequestException extends AWSServiceIOException {
+  /**
+   * HTTP status code which signals this failure mode was triggered: {@value}.
+   */
+  public static final int STATUS_CODE = 400;
+
+  /**
+   * Instantiate.
+   * @param operation operation which triggered this
+   * @param cause the underlying cause
+   */
+  public AWSBadRequestException(String operation,
+      AmazonServiceException cause) {
+    super(operation, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
index a8c01cb..22afb01 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSClientIOException.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.AmazonClientException;
+import com.amazonaws.SdkBaseException;
 import com.google.common.base.Preconditions;
 
 import java.io.IOException;
@@ -31,7 +32,7 @@ public class AWSClientIOException extends IOException {
   private final String operation;
 
   public AWSClientIOException(String operation,
-      AmazonClientException cause) {
+      SdkBaseException cause) {
     super(cause);
     Preconditions.checkArgument(operation != null, "Null 'operation' argument");
     Preconditions.checkArgument(cause != null, "Null 'cause' argument");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java
new file mode 100644
index 0000000..e6a23b2
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSNoResponseException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonServiceException;
+
+/**
+ * Status code 443, no response from server. This is considered idempotent.
+ */
+public class AWSNoResponseException extends AWSServiceIOException {
+  public AWSNoResponseException(String operation,
+      AmazonServiceException cause) {
+    super(operation, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSRedirectException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSRedirectException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSRedirectException.java
new file mode 100644
index 0000000..bb337ee
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSRedirectException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonServiceException;
+
+/**
+ * Request is redirected.
+ * If this gets as far as the user, it's unrecoverable
+ */
+public class AWSRedirectException extends AWSServiceIOException {
+
+  /**
+   * Instantiate.
+   * @param operation operation which triggered this
+   * @param cause the underlying cause
+   */
+  public AWSRedirectException(String operation,
+      AmazonServiceException cause) {
+    super(operation, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceThrottledException.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceThrottledException.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceThrottledException.java
new file mode 100644
index 0000000..131cea7
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSServiceThrottledException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonServiceException;
+
+/**
+ * Exception raised when a service was throttled.
+ */
+public class AWSServiceThrottledException extends AWSServiceIOException {
+
+  /**
+   * HTTP status code which signals this failure mode was triggered: {@value}.
+   */
+  public static final int STATUS_CODE = 503;
+
+  /**
+   * Instantiate.
+   * @param operation operation which triggered this
+   * @param cause the underlying cause
+   */
+  public AWSServiceThrottledException(String operation,
+      AmazonServiceException cause) {
+    super(operation, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java
new file mode 100644
index 0000000..83be294
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AWSStatus500Exception.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonServiceException;
+
+/**
+ * A 500 response came back from a service.
+ * This is considered <i>probably</i> retriable, That is, we assume
+ * <ol>
+ *   <li>whatever error happened in the service itself to have happened
+ *    before the infrastructure committed the operation.</li>
+ *    <li>Nothing else got through either.</li>
+ * </ol>
+ */
+public class AWSStatus500Exception extends AWSServiceIOException {
+  public AWSStatus500Exception(String operation,
+      AmazonServiceException cause) {
+    super(operation, cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
index 5b25730..f13942d 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java
@@ -45,7 +45,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 final class BlockingThreadPoolExecutorService
     extends SemaphoredDelegatingExecutor {
 
-  private static Logger LOG = LoggerFactory
+  private static final Logger LOG = LoggerFactory
       .getLogger(BlockingThreadPoolExecutorService.class);
 
   private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index d278bdf..e6b2bdb 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -35,6 +35,11 @@ public final class Constants {
   private Constants() {
   }
 
+  /**
+   * default hadoop temp dir on local system: {@value}.
+   */
+  public static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
+
   /** The minimum multipart size which S3 supports. */
   public static final int MULTIPART_MIN_SIZE = 5 * 1024 * 1024;
 
@@ -328,14 +333,6 @@ public final class Constants {
   @InterfaceAudience.Private
   public static final int MAX_MULTIPART_COUNT = 10000;
 
-  /**
-   * Classname of the S3A-specific output committer factory. This
-   * is what must be declared when attempting to use
-   */
-  @InterfaceStability.Unstable
-  public static final String S3A_OUTPUT_COMMITTER_FACTORY =
-      "org.apache.hadoop.fs.s3a.commit.S3AOutputCommitterFactory";
-
   /* Constants. */
   public static final String S3_METADATA_STORE_IMPL =
       "fs.s3a.metadatastore.impl";
@@ -411,13 +408,6 @@ public final class Constants {
   public static final int S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT = 25;
 
   /**
-   * V1 committer.
-   */
-  @InterfaceStability.Unstable
-  public static final String S3A_OUTPUT_COMMITTER_MRV1 =
-      "org.apache.hadoop.fs.s3a.commit.S3OutputCommitterMRv1";
-
-  /**
    * The default "Null" metadata store: {@value}.
    */
   @InterfaceStability.Unstable
@@ -463,4 +453,56 @@ public final class Constants {
   @InterfaceStability.Unstable
   public static final int DEFAULT_LIST_VERSION = 2;
 
+  @InterfaceStability.Unstable
+  public static final String FAIL_INJECT_THROTTLE_PROBABILITY =
+      "fs.s3a.failinject.throttle.probability";
+
+  @InterfaceStability.Unstable
+  public static final String FAIL_INJECT_CLIENT_FACTORY =
+      "org.apache.hadoop.fs.s3a.InconsistentS3ClientFactory";
+
+  /**
+   * Number of times to retry any repeatable S3 client request on failure,
+   * excluding throttling requests: {@value}.
+   */
+  public static final String RETRY_LIMIT = "fs.s3a.retry.limit";
+
+  /**
+   * Default retry limit: {@value}.
+   */
+  public static final int RETRY_LIMIT_DEFAULT = DEFAULT_MAX_ERROR_RETRIES;
+
+  /**
+   * Interval between retry attempts.: {@value}.
+   */
+  public static final String RETRY_INTERVAL = "fs.s3a.retry.interval";
+
+  /**
+   * Default retry interval: {@value}.
+   */
+  public static final String RETRY_INTERVAL_DEFAULT = "500ms";
+
+  /**
+   * Number of times to retry any throttled request: {@value}.
+   */
+  public static final String RETRY_THROTTLE_LIMIT =
+      "fs.s3a.retry.throttle.limit";
+
+  /**
+   * Default throttled retry limit: {@value}.
+   */
+  public static final int RETRY_THROTTLE_LIMIT_DEFAULT =
+      DEFAULT_MAX_ERROR_RETRIES;
+
+  /**
+   * Interval between retry attempts on throttled requests: {@value}.
+   */
+  public static final String RETRY_THROTTLE_INTERVAL =
+      "fs.s3a.retry.throttle.interval";
+
+  /**
+   * Default throttled retry interval: {@value}.
+   */
+  public static final String RETRY_THROTTLE_INTERVAL_DEFAULT = "500ms";
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
index 6476f5d..d158061 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java
@@ -18,38 +18,50 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
+import com.amazonaws.SdkClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.services.s3.AmazonS3;
 import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsResult;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
+import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ListObjectsV2Request;
 import com.amazonaws.services.s3.model.ListObjectsV2Result;
+import com.amazonaws.services.s3.model.MultipartUploadListing;
 import com.amazonaws.services.s3.model.ObjectListing;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.PutObjectResult;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-
 /**
  * A wrapper around {@link com.amazonaws.services.s3.AmazonS3} that injects
  * inconsistency and/or errors.  Used for testing S3Guard.
@@ -88,6 +100,21 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   private long delayKeyMsec;
 
   /**
+   * Probability of throttling a request.
+   */
+  private float throttleProbability;
+
+  /**
+   * Counter of failures since last reset.
+   */
+  private final AtomicLong failureCounter = new AtomicLong(0);
+
+  /**
+   * limit for failures before operations succeed; if 0 then "no limit".
+   */
+  private int failureLimit = 0;
+
+  /**
    * Composite of data we need to track about recently deleted objects:
    * when it was deleted (same was with recently put objects) and the object
    * summary (since we should keep returning it for sometime after its
@@ -134,12 +161,25 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     if (delayKeySubstring.equals(MATCH_ALL_KEYS)) {
       delayKeySubstring = "";
     }
-    delayKeyProbability = conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY,
-        DEFAULT_DELAY_KEY_PROBABILITY);
+    delayKeyProbability = validProbability(
+        conf.getFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY,
+            DEFAULT_DELAY_KEY_PROBABILITY));
     delayKeyMsec = conf.getLong(FAIL_INJECT_INCONSISTENCY_MSEC,
         DEFAULT_DELAY_KEY_MSEC);
-    LOG.info("Enabled with {} msec delay, substring {}, probability {}",
-        delayKeyMsec, delayKeySubstring, delayKeyProbability);
+    setThrottleProbability(conf.getFloat(FAIL_INJECT_THROTTLE_PROBABILITY,
+        0.0f));
+    LOG.info("{}", this);
+  }
+
+  @Override
+  public String toString() {
+    return String.format(
+        "Inconsistent S3 Client with"
+            + " %s msec delay, substring %s, delay probability %s;"
+            + " throttle probability %s"
+            + "; failure limit %d, failure count %d",
+        delayKeyMsec, delayKeySubstring, delayKeyProbability,
+        throttleProbability, failureLimit, failureCounter.get());
   }
 
   /**
@@ -174,10 +214,11 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   public DeleteObjectsResult deleteObjects(DeleteObjectsRequest
       deleteObjectsRequest)
       throws AmazonClientException, AmazonServiceException {
+    maybeFail();
     for (DeleteObjectsRequest.KeyVersion keyVersion :
         deleteObjectsRequest.getKeys()) {
-      registerDeleteObject(keyVersion.getKey(), deleteObjectsRequest
-          .getBucketName());
+      registerDeleteObject(keyVersion.getKey(),
+          deleteObjectsRequest.getBucketName());
     }
     return super.deleteObjects(deleteObjectsRequest);
   }
@@ -187,6 +228,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
       throws AmazonClientException, AmazonServiceException {
     String key = deleteObjectRequest.getKey();
     LOG.debug("key {}", key);
+    maybeFail();
     registerDeleteObject(key, deleteObjectRequest.getBucketName());
     super.deleteObject(deleteObjectRequest);
   }
@@ -196,6 +238,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   public PutObjectResult putObject(PutObjectRequest putObjectRequest)
       throws AmazonClientException, AmazonServiceException {
     LOG.debug("key {}", putObjectRequest.getKey());
+    maybeFail();
     registerPutObject(putObjectRequest);
     return super.putObject(putObjectRequest);
   }
@@ -204,6 +247,20 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   @Override
   public ObjectListing listObjects(ListObjectsRequest listObjectsRequest)
       throws AmazonClientException, AmazonServiceException {
+    maybeFail();
+    return innerlistObjects(listObjectsRequest);
+  }
+
+  /**
+   * Run the list object call without any failure probability.
+   * This stops a very aggressive failure rate from completely overloading
+   * the retry logic.
+   * @param listObjectsRequest request
+   * @return listing
+   * @throws AmazonClientException failure
+   */
+  private ObjectListing innerlistObjects(ListObjectsRequest listObjectsRequest)
+      throws AmazonClientException, AmazonServiceException {
     LOG.debug("prefix {}", listObjectsRequest.getPrefix());
     ObjectListing listing = super.listObjects(listObjectsRequest);
     listing = filterListObjects(listing);
@@ -215,6 +272,16 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   @Override
   public ListObjectsV2Result listObjectsV2(ListObjectsV2Request request)
       throws AmazonClientException, AmazonServiceException {
+    maybeFail();
+    return innerListObjectsV2(request);
+  }
+
+  /**
+   * Non failing V2 list object request.
+   * @param request request
+   * @return result.
+   */
+  private ListObjectsV2Result innerListObjectsV2(ListObjectsV2Request request) {
     LOG.debug("prefix {}", request.getPrefix());
     ListObjectsV2Result listing = super.listObjectsV2(request);
     listing = filterListObjectsV2(listing);
@@ -222,17 +289,13 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     return listing;
   }
 
-
   private void addSummaryIfNotPresent(List<S3ObjectSummary> list,
       S3ObjectSummary item) {
     // Behavior of S3ObjectSummary
     String key = item.getKey();
-    for (S3ObjectSummary member : list) {
-      if (member.getKey().equals(key)) {
-        return;
-      }
+    if (list.stream().noneMatch((member) -> member.getKey().equals(key))) {
+      list.add(item);
     }
-    list.add(item);
   }
 
   /**
@@ -396,13 +459,9 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   }
 
   private List<String> filterPrefixes(List<String> prefixes) {
-    List<String> outputPrefixes = new ArrayList<>();
-    for (String key : prefixes) {
-      if (!isKeyDelayed(delayedPutKeys.get(key), key)) {
-        outputPrefixes.add(key);
-      }
-    }
-    return outputPrefixes;
+    return prefixes.stream()
+        .filter(key -> !isKeyDelayed(delayedPutKeys.get(key), key))
+        .collect(Collectors.toList());
   }
 
   private boolean isKeyDelayed(Long enqueueTime, String key) {
@@ -425,14 +484,14 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
   private void registerDeleteObject(String key, String bucket) {
     if (shouldDelay(key)) {
       // Record summary so we can add it back for some time post-deletion
-      S3ObjectSummary summary = null;
-      ObjectListing list = listObjects(bucket, key);
-      for (S3ObjectSummary result : list.getObjectSummaries()) {
-        if (result.getKey().equals(key)) {
-          summary = result;
-          break;
-        }
-      }
+      ListObjectsRequest request = new ListObjectsRequest()
+              .withBucketName(bucket)
+              .withPrefix(key);
+      S3ObjectSummary summary = innerlistObjects(request).getObjectSummaries()
+          .stream()
+          .filter(result -> result.getKey().equals(key))
+          .findFirst()
+          .orElse(null);
       delayedDeletes.put(key, new Delete(System.currentTimeMillis(), summary));
     }
   }
@@ -471,7 +530,109 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     delayedPutKeys.put(key, System.currentTimeMillis());
   }
 
+  @Override
+  public CompleteMultipartUploadResult completeMultipartUpload(
+      CompleteMultipartUploadRequest completeMultipartUploadRequest)
+      throws SdkClientException, AmazonServiceException {
+    maybeFail();
+    return super.completeMultipartUpload(completeMultipartUploadRequest);
+  }
+
+  @Override
+  public UploadPartResult uploadPart(UploadPartRequest uploadPartRequest)
+      throws SdkClientException, AmazonServiceException {
+    maybeFail();
+    return super.uploadPart(uploadPartRequest);
+  }
+
+  @Override
+  public InitiateMultipartUploadResult initiateMultipartUpload(
+      InitiateMultipartUploadRequest initiateMultipartUploadRequest)
+      throws SdkClientException, AmazonServiceException {
+    maybeFail();
+    return super.initiateMultipartUpload(initiateMultipartUploadRequest);
+  }
+
+  @Override
+  public MultipartUploadListing listMultipartUploads(
+      ListMultipartUploadsRequest listMultipartUploadsRequest)
+      throws SdkClientException, AmazonServiceException {
+    maybeFail();
+    return super.listMultipartUploads(listMultipartUploadsRequest);
+  }
+
+  public float getDelayKeyProbability() {
+    return delayKeyProbability;
+  }
+
+  public long getDelayKeyMsec() {
+    return delayKeyMsec;
+  }
+
+  /**
+   * Get the probability of the request being throttled.
+   * @return a value 0 - 1.0f.
+   */
+  public float getThrottleProbability() {
+    return throttleProbability;
+  }
+
+  /**
+   * Set the probability of throttling a request.
+   * @param throttleProbability the probability of a request being throttled.
+   */
+  public void setThrottleProbability(float throttleProbability) {
+    this.throttleProbability = validProbability(throttleProbability);
+  }
+
+  /**
+   * Validate a probability option.
+   * @param p probability
+   * @return the probability, if valid
+   * @throws IllegalArgumentException if the probability is out of range.
+   */
+  private float validProbability(float p) {
+    Preconditions.checkArgument(p >= 0.0f && p <= 1.0f,
+        "Probability out of range 0 to 1 %s", p);
+    return p;
+  }
+
+  /**
+   * Conditionally fail the operation.
+   * @throws AmazonClientException if the client chooses to fail
+   * the request.
+   */
+  private void maybeFail() throws AmazonClientException {
+    // code structure here is to line up for more failures later
+    AmazonServiceException ex = null;
+    if (trueWithProbability(throttleProbability)) {
+      // throttle the request
+      ex = new AmazonServiceException("throttled"
+          + " count = " + (failureCounter.get() + 1), null);
+      ex.setStatusCode(503);
+    }
+
+    if (ex != null) {
+      long count = failureCounter.incrementAndGet();
+      if (failureLimit == 0
+          || (failureLimit > 0 && count < failureLimit)) {
+        throw ex;
+      }
+    }
+  }
+
+  /**
+   * Set the limit on failures before all operations pass through.
+   * This resets the failure count.
+   * @param limit limit; "0" means "no limit"
+   */
+  public void setFailureLimit(int limit) {
+    this.failureLimit = limit;
+    failureCounter.set(0);
+  }
+
   /** Since ObjectListing is immutable, we just override it with wrapper. */
+  @SuppressWarnings("serial")
   private static class CustomObjectListing extends ObjectListing {
 
     private final List<S3ObjectSummary> customListing;
@@ -506,6 +667,7 @@ public class InconsistentAmazonS3Client extends AmazonS3Client {
     }
   }
 
+  @SuppressWarnings("serial")
   private static class CustomListObjectsV2Result extends ListObjectsV2Result {
 
     private final List<S3ObjectSummary> customListing;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
new file mode 100644
index 0000000..9900f4c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Invoker.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.Optional;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.SdkBaseException;
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.io.retry.RetryPolicy;
+
+/**
+ * Class to provide lambda expression invocation of AWS operations.
+ *
+ * The core retry logic is in
+ * {@link #retryUntranslated(String, boolean, Retried, Operation)};
+ * the other {@code retry() and retryUntranslated()} calls are wrappers.
+ *
+ * The static {@link #once(String, String, Operation)} and
+ * {@link #once(String, String, VoidOperation)} calls take an operation and
+ * return it with AWS exceptions translated to IOEs of some form.
+ *
+ * The retry logic on a failure is defined by the retry policy passed in
+ * the constructor; the standard retry policy is {@link S3ARetryPolicy},
+ * though others may be used.
+ *
+ * The constructor also takes two {@link Retried} callbacks.
+ * The {@code caughtCallback} is called whenever an exception (IOE or AWS)
+ * is caught, before the retry processing looks at it.
+ * The {@code retryCallback} is invoked after a retry is scheduled
+ * but before the sleep.
+ * These callbacks can be used for reporting and incrementing statistics.
+ *
+ * The static {@link #quietly(String, String, VoidOperation)} and
+ * {@link #quietlyEval(String, String, Operation)} calls exist to take any
+ * operation and quietly catch & log at debug. The return value of
+ * {@link #quietlyEval(String, String, Operation)} is a java 8 optional,
+ * which can then be used in java8-expressions.
+ */
+public class Invoker {
+  private static final Logger LOG = LoggerFactory.getLogger(Invoker.class);
+
+  /**
+   * Retry policy to use.
+   */
+  private final RetryPolicy retryPolicy;
+
+  /**
+   * Default retry handler.
+   */
+  private final Retried retryCallback;
+
+  /**
+   * Instantiate.
+   * @param retryPolicy retry policy for all operations.
+   * @param retryCallback standard retry policy
+   */
+  public Invoker(
+      RetryPolicy retryPolicy,
+      Retried retryCallback) {
+    this.retryPolicy = retryPolicy;
+    this.retryCallback = retryCallback;
+  }
+
+  public RetryPolicy getRetryPolicy() {
+    return retryPolicy;
+  }
+
+  public Retried getRetryCallback() {
+    return retryCallback;
+  }
+
+  /**
+   * Execute a function, translating any exception into an IOException.
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param operation operation to execute
+   * @param <T> type of return value
+   * @return the result of the function call
+   * @throws IOException any IOE raised, or translated exception
+   */
+  @Retries.OnceTranslated
+  public static <T> T once(String action, String path, Operation<T> operation)
+      throws IOException {
+    try {
+      return operation.execute();
+    } catch (AmazonClientException e) {
+      throw S3AUtils.translateException(action, path, e);
+    }
+  }
+
+  /**
+   * Execute an operation with no result.
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param operation operation to execute
+   * @throws IOException any IOE raised, or translated exception
+   */
+  @Retries.OnceTranslated
+  public static void once(String action, String path, VoidOperation operation)
+      throws IOException {
+    once(action, path,
+        () -> {
+          operation.execute();
+          return null;
+        });
+  }
+
+  /**
+   * Execute an operation and ignore all raised IOExceptions; log at INFO.
+   * @param log log to log at info.
+   * @param action action to include in log
+   * @param path optional path to include in log
+   * @param operation operation to execute
+   * @param <T> type of operation
+   */
+  public static <T> void ignoreIOExceptions(
+      Logger log,
+      String action,
+      String path,
+      Operation<T> operation) {
+    try {
+      once(action, path, operation);
+    } catch (IOException e) {
+      log.info("{}: {}", toDescription(action, path), e.toString(), e);
+    }
+  }
+
+  /**
+   * Execute an operation and ignore all raised IOExceptions; log at INFO.
+   * @param log log to log at info.
+   * @param action action to include in log
+   * @param path optional path to include in log
+   * @param operation operation to execute
+   */
+  public static void ignoreIOExceptions(
+      Logger log,
+      String action,
+      String path,
+      VoidOperation operation) {
+    ignoreIOExceptions(log, action, path,
+        () -> {
+          operation.execute();
+          return null;
+        });
+  }
+
+  /**
+   * Execute a void operation with retry processing.
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param idempotent does the operation have semantics
+   * which mean that it can be retried even if was already executed?
+   * @param retrying callback on retries
+   * @param operation operation to execute
+   * @throws IOException any IOE raised, or translated exception
+   */
+  @Retries.RetryTranslated
+  public void retry(String action,
+      String path,
+      boolean idempotent,
+      Retried retrying,
+      VoidOperation operation)
+      throws IOException {
+    retry(action, path, idempotent, retrying,
+        () -> {
+          operation.execute();
+          return null;
+        });
+  }
+
+  /**
+   * Execute a void operation with  the default retry callback invoked.
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param idempotent does the operation have semantics
+   * which mean that it can be retried even if was already executed?
+   * @param operation operation to execute
+   * @throws IOException any IOE raised, or translated exception
+   */
+  @Retries.RetryTranslated
+  public void retry(String action,
+      String path,
+      boolean idempotent,
+      VoidOperation operation)
+      throws IOException {
+    retry(action, path, idempotent, retryCallback, operation);
+  }
+
+  /**
+   * Execute a function with the default retry callback invoked.
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param idempotent does the operation have semantics
+   * which mean that it can be retried even if was already executed?
+   * @param operation operation to execute
+   * @param <T> type of return value
+   * @return the result of the call
+   * @throws IOException any IOE raised, or translated exception
+   */
+  @Retries.RetryTranslated
+  public <T> T retry(String action,
+      String path,
+      boolean idempotent,
+      Operation<T> operation)
+      throws IOException {
+
+    return retry(action, path, idempotent, retryCallback, operation);
+  }
+
+  /**
+   * Execute a function with retry processing.
+   * Uses {@link #once(String, String, Operation)} as the inner
+   * invocation mechanism before retry logic is performed.
+   * @param <T> type of return value
+   * @param action action to execute (used in error messages)
+   * @param path path of work (used in error messages)
+   * @param idempotent does the operation have semantics
+   * which mean that it can be retried even if was already executed?
+   * @param retrying callback on retries
+   * @param operation operation to execute
+   * @return the result of the call
+   * @throws IOException any IOE raised, or translated exception
+   */
+  @Retries.RetryTranslated
+  public <T> T retry(
+      String action,
+      String path,
+      boolean idempotent,
+      Retried retrying,
+      Operation<T> operation)
+      throws IOException {
+    return retryUntranslated(
+        toDescription(action, path),
+        idempotent,
+        retrying,
+        () -> once(action, path, operation));
+  }
+
+  /**
+   * Execute a function with retry processing and no translation.
+   * and the default retry callback.
+   * @param text description for the catching callback
+   * @param idempotent does the operation have semantics
+   * which mean that it can be retried even if was already executed?
+   * @param operation operation to execute
+   * @param <T> type of return value
+   * @return the result of the call
+   * @throws IOException any IOE raised
+   * @throws RuntimeException any Runtime exception raised
+   */
+  @Retries.RetryRaw
+  public <T> T retryUntranslated(
+      String text,
+      boolean idempotent,
+      Operation<T> operation) throws IOException {
+    return retryUntranslated(text, idempotent,
+        retryCallback, operation);
+  }
+
+  /**
+   * Execute a function with retry processing: AWS SDK Exceptions
+   * are <i>not</i> translated.
+   * This is method which the others eventually invoke.
+   * @param <T> type of return value
+   * @param text text to include in messages
+   * @param idempotent does the operation have semantics
+   * which mean that it can be retried even if was already executed?
+   * @param retrying callback on retries
+   * @param operation operation to execute
+   * @return the result of the call
+   * @throws IOException any IOE raised
+   * @throws SdkBaseException any AWS exception raised
+   * @throws RuntimeException : these are never caught and retries.
+   */
+  @Retries.RetryRaw
+  public <T> T retryUntranslated(
+      String text,
+      boolean idempotent,
+      Retried retrying,
+      Operation<T> operation) throws IOException {
+
+    Preconditions.checkArgument(retrying != null, "null retrying argument");
+    int retryCount = 0;
+    Exception caught;
+    RetryPolicy.RetryAction retryAction;
+    boolean shouldRetry;
+    do {
+      try {
+        // execute the operation, returning if successful
+        return operation.execute();
+      } catch (IOException | SdkBaseException e) {
+        caught = e;
+      }
+      // you only get here if the operation didn't complete
+      // normally, hence caught != null
+
+      // translate the exception into an IOE for the retry logic
+      IOException translated;
+      if (caught instanceof IOException) {
+        translated = (IOException) caught;
+      } else {
+        translated = S3AUtils.translateException(text, "",
+            (SdkBaseException)caught);
+      }
+
+
+      int attempts = retryCount + 1;
+      try {
+        // decide action base on operation, invocation count, etc
+        retryAction = retryPolicy.shouldRetry(translated, retryCount, 0,
+            idempotent);
+        // is it a retry operation?
+        shouldRetry = retryAction.action.equals(
+            RetryPolicy.RetryAction.RETRY.action);
+        if (shouldRetry) {
+          // notify the callback
+          retrying.onFailure(text, translated, retryCount, idempotent);
+          // then sleep for the policy delay
+          Thread.sleep(retryAction.delayMillis);
+        }
+        // increment the retry count
+        retryCount++;
+      } catch (InterruptedException e) {
+        // sleep was interrupted
+        // change the exception
+        caught = new InterruptedIOException("Interrupted");
+        caught.initCause(e);
+        // no retry
+        shouldRetry = false;
+        // and re-interrupt the thread
+        Thread.currentThread().interrupt();
+      } catch (Exception e) {
+        // The retry policy raised an exception
+        // log that something happened
+        LOG.warn("{}: exception in retry processing", text, e);
+        // and fail the execution with the last execution exception.
+        shouldRetry = false;
+      }
+    } while (shouldRetry);
+
+    if (caught instanceof IOException) {
+      throw (IOException) caught;
+    } else {
+      throw (SdkBaseException) caught;
+    }
+  }
+
+
+  /**
+   * Execute an operation; any exception raised is simply caught and
+   * logged at debug.
+   * @param action action to execute
+   * @param path path (for exception construction)
+   * @param operation operation
+   */
+  public static void quietly(String action,
+      String path,
+      VoidOperation operation) {
+    try {
+      once(action, path, operation);
+    } catch (Exception e) {
+      LOG.debug("Action {} failed", action, e);
+    }
+  }
+
+  /**
+   * Execute an operation; any exception raised is caught and
+   * logged at debug.
+   * The result is only non-empty if the operation succeeded
+   * @param action action to execute
+   * @param path path (for exception construction)
+   * @param operation operation
+   */
+  public static <T> Optional<T> quietlyEval(String action,
+      String path,
+      Operation<T> operation) {
+    try {
+      return Optional.of(once(action, path, operation));
+    } catch (Exception e) {
+      LOG.debug("Action {} failed", action, e);
+      return Optional.empty();
+    }
+  }
+
+  /**
+   * Take an action and path and produce a string for logging.
+   * @param action action
+   * @param path path (may be null or empty)
+   * @return string for logs
+   */
+  private static String toDescription(String action, String path) {
+    return action +
+        (StringUtils.isNotEmpty(path) ? (" on " + path) : "");
+  }
+
+  /**
+   * Arbitrary operation throwing an IOException.
+   * @param <T> return type
+   */
+  @FunctionalInterface
+  public interface Operation<T> {
+    T execute() throws IOException;
+  }
+
+  /**
+   * Void operation which may raise an IOException.
+   */
+  @FunctionalInterface
+  public interface VoidOperation {
+    void execute() throws IOException;
+  }
+
+  /**
+   * Callback for retry and notification operations.
+   * Even if the interface is throwing up "raw" exceptions, this handler
+   * gets the translated one.
+   */
+  @FunctionalInterface
+  public interface Retried {
+    /**
+     * Retry event in progress (before any sleep).
+     * @param text text passed in to the retry() Call.
+     * @param exception the caught (and possibly translated) exception.
+     * @param retries number of retries so far
+     * @param idempotent is the request idempotent.
+     */
+    void onFailure(
+        String text,
+        IOException exception,
+        int retries,
+        boolean idempotent);
+  }
+
+  /**
+   * No op for a retrying callback.
+   */
+  public static final Retried NO_OP = new Retried() {
+    @Override
+    public void onFailure(String text,
+        IOException exception,
+        int retries,
+        boolean idempotent) {
+    }
+  };
+
+  /**
+   * Log summary at info, full stack at debug.
+   */
+  public static final Retried LOG_EVENT = new Retried() {
+    @Override
+    public void onFailure(String text,
+        IOException exception,
+        int retries,
+        boolean idempotent) {
+      LOG.debug("{}: " + exception, text);
+      if (retries == 1) {
+        // stack on first attempt, to keep noise down
+        LOG.debug("{}: " + exception, text, exception);
+      }
+    }
+  };
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
index d9f059b..eb87705 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
@@ -385,8 +385,9 @@ public class Listing {
         status = statusBatchIterator.next();
         // We remove from provided list the file status listed by S3 so that
         // this does not return duplicate items.
-        LOG.debug("Removing the status from provided file status {}", status);
-        providedStatus.remove(status);
+        if (providedStatus.remove(status)) {
+          LOG.debug("Removed the status from provided file status {}", status);
+        }
       } else {
         if (providedStatusIterator.hasNext()) {
           status = providedStatusIterator.next();
@@ -540,10 +541,11 @@ public class Listing {
      * initial set of results/fail if there was a problem talking to the bucket.
      * @param listPath path of the listing
      * @param request initial request to make
-     * */
+     * @throws IOException if listObjects raises one.
+     */
     ObjectListingIterator(
         Path listPath,
-        S3ListRequest request) {
+        S3ListRequest request) throws IOException {
       this.listPath = listPath;
       this.maxKeys = owner.getMaxKeys();
       this.objects = owner.listObjects(request);
@@ -571,6 +573,7 @@ public class Listing {
      * @throws NoSuchElementException if there is no more data to list.
      */
     @Override
+    @Retries.RetryTranslated
     public S3ListResult next() throws IOException {
       if (firstListing) {
         // on the first listing, don't request more data.
@@ -814,19 +817,4 @@ public class Listing {
     }
   }
 
-  /**
-   * A Path filter which accepts all filenames.
-   */
-  static final PathFilter ACCEPT_ALL = new PathFilter() {
-    @Override
-    public boolean accept(Path file) {
-      return true;
-    }
-
-    @Override
-    public String toString() {
-      return "ACCEPT_ALL";
-    }
-  };
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/de8b6ca5/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java
new file mode 100644
index 0000000..80ecf0c
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Retries.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Declaration of retry policy for documentation only.
+ * This is purely for visibility in source and is currently package-scoped.
+ * Compare with {@link org.apache.hadoop.io.retry.AtMostOnce}
+ * and {@link org.apache.hadoop.io.retry.Idempotent}; these are real
+ * markers used by Hadoop RPC.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class Retries {
+  /**
+   * No retry, exceptions are translated.
+   */
+  @Documented
+  @Retention(RetentionPolicy.SOURCE)
+  public @interface OnceTranslated {
+    String value() default "";
+  }
+
+  /**
+   * No retry, exceptions are not translated.
+   */
+  @Documented
+  @Retention(RetentionPolicy.SOURCE)
+  public @interface OnceRaw {
+    String value() default "";
+  }
+
+  /**
+   * No retry, expect a bit of both.
+   */
+  @Documented
+  @Retention(RetentionPolicy.SOURCE)
+  public @interface OnceMixed {
+    String value() default "";
+  }
+
+  /**
+   * Retried, exceptions are translated.
+   */
+  @Documented
+  @Retention(RetentionPolicy.SOURCE)
+  public @interface RetryTranslated {
+    String value() default "";
+  }
+
+  /**
+   * Retried, no translation.
+   */
+  @Documented
+  @Retention(RetentionPolicy.SOURCE)
+  public @interface RetryRaw {
+    String value() default "";
+  }
+
+  /**
+   * Retried, mixed translation.
+   */
+  @Documented
+  @Retention(RetentionPolicy.SOURCE)
+  public @interface RetryMixed {
+    String value() default "";
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message