hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject [2/6] hadoop git commit: HADOOP-13028 add low level counter metrics for S3A; use in read performance tests. contributed by: stevel patch includes HADOOP-12844 Recover when S3A fails on IOException in read() HADOOP-13058 S3A FS fails during init against a
Date Thu, 12 May 2016 18:24:41 GMT
HADOOP-13028 add low level counter metrics for S3A; use in read performance tests. contributed by: stevel
patch includes
HADOOP-12844 Recover when S3A fails on IOException in read()
HADOOP-13058 S3A FS fails during init against a read-only FS if multipart purge
HADOOP-13047 S3a Forward seek in stream length to be configurable


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8d74a580
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8d74a580
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8d74a580

Branch: refs/heads/branch-2.8
Commit: 8d74a5804250cb07030a2aca85074e5f8d8bb276
Parents: 9b8f7a8
Author: Steve Loughran <stevel@apache.org>
Authored: Thu May 12 19:23:18 2016 +0100
Committer: Steve Loughran <stevel@apache.org>
Committed: Thu May 12 19:23:18 2016 +0100

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/FSDataInputStream.java |   9 +
 .../hadoop/metrics2/MetricStringBuilder.java    | 141 ++++++
 .../hadoop/metrics2/lib/MutableCounterLong.java |   2 +-
 .../src/main/resources/core-default.xml         |  10 +-
 .../hadoop-aws/dev-support/findbugs-exclude.xml | 357 +-------------
 .../apache/hadoop/fs/s3/FileSystemStore.java    |   4 +-
 .../org/apache/hadoop/fs/s3/S3Credentials.java  |   2 +
 .../fs/s3a/AnonymousAWSCredentialsProvider.java |   4 +
 .../fs/s3a/BasicAWSCredentialsProvider.java     |   4 +
 .../org/apache/hadoop/fs/s3a/Constants.java     |  18 +-
 .../hadoop/fs/s3a/S3AFastOutputStream.java      |  11 +-
 .../org/apache/hadoop/fs/s3a/S3AFileStatus.java |  12 +-
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java | 484 +++++++++++--------
 .../apache/hadoop/fs/s3a/S3AInputStream.java    | 342 +++++++++----
 .../hadoop/fs/s3a/S3AInstrumentation.java       | 457 +++++++++++++++++
 .../apache/hadoop/fs/s3a/S3AOutputStream.java   |  40 +-
 .../src/site/markdown/tools/hadoop-aws/index.md |  68 ++-
 .../hadoop/fs/s3a/scale/S3AScaleTestBase.java   | 191 +++++++-
 .../fs/s3a/scale/TestS3ADeleteManyFiles.java    |   1 -
 .../scale/TestS3AInputStreamPerformance.java    | 285 +++++++++++
 .../src/test/resources/log4j.properties         |   3 +
 21 files changed, 1768 insertions(+), 677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index da98769..640db59 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -234,4 +234,13 @@ public class FSDataInputStream extends DataInputStream
           "support unbuffering.");
     }
   }
+
+  /**
+   * String value. Includes the string value of the inner stream
+   * @return the stream
+   */
+  @Override
+  public String toString() {
+    return super.toString() + ": " + in;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java
new file mode 100644
index 0000000..18a499a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Build a string dump of the metrics.
+ *
+ * The {@link #toString()} operator dumps out all values collected.
+ *
+ * Every entry is formatted as
+ * {@code prefix + name + separator + value + suffix}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MetricStringBuilder extends MetricsRecordBuilder {
+
+  private final StringBuilder builder = new StringBuilder(256);
+
+  private final String prefix;
+  private final String suffix;
+  private final String separator;
+  private final MetricsCollector parent;
+
+  /**
+   * Build an instance.
+   * @param parent parent collector. Unused in this instance; only used for
+   * the {@link #parent()} method
+   * @param prefix string before each entry
+   * @param separator separator between name and value
+   * @param suffix suffix after each entry
+   */
+  public MetricStringBuilder(MetricsCollector parent,
+      String prefix,
+      String separator,
+      String suffix) {
+    this.parent = parent;
+    this.prefix = prefix;
+    this.suffix = suffix;
+    this.separator = separator;
+  }
+
+  public MetricStringBuilder add(MetricsInfo info, Object value) {
+    return tuple(info.name(), value.toString());
+  }
+
+  /**
+   * Add any key,val pair to the string, between the prefix and suffix,
+   * separated by the separator.
+   * @param key key
+   * @param value value
+   * @return this instance
+   */
+  public MetricStringBuilder tuple(String key, String value) {
+    builder.append(prefix)
+        .append(key)
+        .append(separator)
+        .append(value)
+        .append(suffix);
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder add(MetricsTag tag) {
+    return tuple(tag.name(), tag.value());
+  }
+
+  @Override
+  public MetricsRecordBuilder add(AbstractMetric metric) {
+    add(metric.info(), metric.toString());
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder setContext(String value) {
+    return tuple("context", value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsCollector parent() {
+    return parent;
+  }
+
+  @Override
+  public String toString() {
+    return builder.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java
index 03a6043..d3dec2e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java
@@ -34,7 +34,7 @@ public class MutableCounterLong extends MutableCounter {
 
   private AtomicLong value = new AtomicLong();
 
-  MutableCounterLong(MetricsInfo info, long initValue) {
+  public MutableCounterLong(MetricsInfo info, long initValue) {
     super(info);
     this.value.set(initValue);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 3869c2c..2ad6881 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -911,7 +911,15 @@
     uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
 </property>
 
-  <property>
+<property>
+  <name>fs.s3a.readahead.range</name>
+  <value>65536</value>
+  <description>Bytes to read ahead during a seek() before closing and
+  re-opening the S3 HTTP connection. This option will be overridden if
+  any call to setReadahead() is made to an open stream.</description>
+</property>
+
+<property>
   <name>fs.s3a.fast.buffer.size</name>
   <value>1048576</value>
   <description>Size of initial memory buffer in bytes allocated for an

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
index 204e6ab..2b4160a 100644
--- a/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
+++ b/hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
@@ -15,361 +15,8 @@
    limitations under the License.
 -->
 <FindBugsFilter>
-     <Match>
-       <Package name="org.apache.hadoop.security.proto" />
-     </Match>
-     <Match>
-       <Package name="org.apache.hadoop.tools.proto" />
-     </Match>
-     <Match>
-       <Bug pattern="EI_EXPOSE_REP" />
-     </Match>
-     <Match>
-       <Bug pattern="EI_EXPOSE_REP2" />
-     </Match>
-     <Match>
-       <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
-     </Match>
-     <Match>
-       <Class name="~.*_jsp" />
-       <Bug pattern="DLS_DEAD_LOCAL_STORE" />
-     </Match>
-     <Match>
-       <Field name="_jspx_dependants" />
-       <Bug pattern="UWF_UNWRITTEN_FIELD" />
-     </Match>
-     <!-- 
-       Inconsistent synchronization for Client.Connection.out is
-       is intentional to make a connection to be closed instantly. 
-     --> 
-     <Match>
-       <Class name="org.apache.hadoop.ipc.Client$Connection" />
-       <Field name="out" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
-     <!-- 
-       Further SaslException should be ignored during cleanup and
-       original exception should be re-thrown.
-     --> 
-     <Match>
-       <Class name="org.apache.hadoop.security.SaslRpcClient" />
-       <Bug pattern="DE_MIGHT_IGNORE" />
-     </Match>
-     <!-- 
-       Ignore Cross Scripting Vulnerabilities
-     -->
-     <Match>
-       <Package name="~org.apache.hadoop.mapred.*" />
-       <Bug code="XSS" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.taskdetails_jsp" />
-       <Bug code="HRS" />
-     </Match>
-     <!--
-       Ignore warnings where child class has the same name as
-       super class. Classes based on Old API shadow names from
-       new API. Should go off after HADOOP-1.0
-     -->
-     <Match>
-       <Class name="~org.apache.hadoop.mapred.*" />
-       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
-     </Match>
-     <!--
-       Ignore warnings for usage of System.exit. This is
-       required and have been well thought out
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.Child$2" />
-       <Method name="run" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.JobTracker" />
-       <Method name="addHostToNodeMapping" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.Task" />
-       <Or>
-       <Method name="done" />
-       <Method name="commit" />
-       <Method name="statusUpdate" />
-       </Or>
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.Task$TaskReporter" />
-       <Method name="run" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.util.ProgramDriver" />
-       <Method name="driver" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.util.RunJar" />
-       <Method name="run" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <!--
-       We need to cast objects between old and new api objects
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.OutputCommitter" />
-       <Bug pattern="BC_UNCONFIRMED_CAST" />
-     </Match>
-     <!--
-       We intentionally do the get name from the inner class
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.TaskTracker$MapEventsFetcherThread" />
-       <Method name="run" />
-       <Bug pattern="IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
-       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
-     </Match>
-     <!--
-       Ignoring this warning as resolving this would need a non-trivial change in code 
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor" />
-       <Method name="configure" />
-       <Field name="maxNumItems" />
-       <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
-     </Match>
-     <!--
-       Comes from org.apache.jasper.runtime.ResourceInjector. Cannot do much.
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.jobqueue_005fdetails_jsp" />
-       <Field name="_jspx_resourceInjector" />
-       <Bug pattern="SE_BAD_FIELD" />
-     </Match>
-     <!--
-       Storing textInputFormat and then passing it as a parameter. Safe to ignore.
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorJob" />
-       <Method name="createValueAggregatorJob" />
-       <Bug pattern="DLS_DEAD_STORE_OF_CLASS_LITERAL" />
-     </Match>
-     <!--
-       Can remove this after the upgrade to findbugs1.3.8
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat" />
-       <Method name="getSplits" />
-       <Bug pattern="DLS_DEAD_LOCAL_STORE" />
-     </Match>
-    <!--
-      This is a spurious warning. Just ignore
-    -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer" />
-       <Field name="kvindex" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
-
-     <!-- 
-        core changes 
-     -->
-     <Match>
-       <Class name="~org.apache.hadoop.*" />
-       <Bug code="MS" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.fs.FileSystem" />
-       <Method name="checkPath" />
-       <Bug pattern="ES_COMPARING_STRINGS_WITH_EQ" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.io.Closeable" />
-       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.security.AccessControlException" />
-       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.util.ProcfsBasedProcessTree" />
-       <Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME" />
-     </Match>
-
-     <!--
-       Streaming, Examples
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.streaming.StreamUtil$TaskId" />
-       <Bug pattern="URF_UNREAD_FIELD" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.examples.DBCountPageView" />
-       <Method name="verify" />
-       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.examples.ContextFactory" />
-       <Method name="setAttributes" />
-       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
-     </Match>
-
-     <!--
-       TFile
-     -->
-      <Match>
-       <Class name="org.apache.hadoop.io.file.tfile.Chunk$ChunkDecoder" />
-       <Method name="close" />
-       <Bug pattern="SR_NOT_CHECKED" />
-      </Match>
-    <!--
-      The purpose of skip() is to drain remaining bytes of the chunk-encoded
-	  stream (one chunk at a time). The termination condition is checked by
-	  checkEOF().
-    -->
-     <Match>
-       <Class name="org.apache.hadoop.io.file.tfile.Utils" />
-       <Method name="writeVLong" />
-       <Bug pattern="SF_SWITCH_FALLTHROUGH" />
-     </Match>
-    <!--
-	  The switch condition fall through is intentional and for performance
-	  purposes.
-    -->
-
-    <Match>
-      <Class name="org.apache.hadoop.log.EventCounter"/>
-      <!-- backward compatibility -->
-      <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
-    </Match>
-    <Match>
-      <Class name="org.apache.hadoop.metrics.jvm.EventCounter"/>
-      <!-- backward compatibility -->
-      <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngineProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
-    </Match>
-        <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcHeaderProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ha\.proto\.ZKFCProtocolProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.security\.proto\.SecurityProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.TestProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/>
-    </Match>
-
-    <!--
-       Manually checked, misses child thread manually syncing on parent's intrinsic lock.
-    -->
-     <Match>
-       <Class name="org.apache.hadoop.metrics2.lib.MutableQuantiles" />
-       <Field name="previousSnapshot" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
-     <!--
-       The method uses a generic type T that extends two other types
-       T1 and T2. Findbugs complains of a cast from T1 to T2.
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.fs.DelegationTokenRenewer" />
-       <Method name="removeRenewAction" />
-       <Bug pattern="BC_UNCONFIRMED_CAST" />
-     </Match>
-     
-     <!-- Inconsistent synchronization flagged by findbugs is not valid. -->
-     <Match>
-       <Class name="org.apache.hadoop.ipc.Client$Connection" />
-       <Field name="in" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
-     <!-- 
-       The switch condition for INITIATE is expected to fallthru to RESPONSE
-       to process initial sasl response token included in the INITIATE
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.ipc.Server$Connection" />
-       <Method name="processSaslMessage" />
-       <Bug pattern="SF_SWITCH_FALLTHROUGH" />
-     </Match>
-
-     <!-- Synchronization performed on util.concurrent instance. -->
-     <Match>
-       <Class name="org.apache.hadoop.service.AbstractService" />
-       <Method name="stop" />
-       <Bug code="JLM" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.service.AbstractService" />
-       <Method name="waitForServiceToStop" />
-       <Bug code="JLM" />
-     </Match>
-
-  <!--
-  OpenStack Swift FS module -closes streams in a different method
-  from where they are opened.
-  -->
-    <Match>
-      <Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/>
-      <Method name="uploadFileAttempt"/>
-      <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
-    </Match>
-    <Match>
-      <Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/>
-      <Method name="uploadFilePartAttempt"/>
-      <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
-    </Match>
-
-     <!-- code from maven source, null value is checked at callee side. -->
-     <Match>
-       <Class name="org.apache.hadoop.util.ComparableVersion$ListItem" />
-       <Method name="compareTo" />
-       <Bug code="NP" />
-     </Match>
-
+  <!-- S3n warnings about malicious code aren't that relevant given its limited future. -->
   <Match>
-    <Class name="org.apache.hadoop.util.HttpExceptionUtils"/>
-    <Method name="validateResponse"/>
-    <Bug pattern="REC_CATCH_EXCEPTION"/>
+    <Class name="org.apache.hadoop.fs.s3.INode" />
   </Match>
-
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java
index 07e456b..3c7ed60 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java
@@ -55,13 +55,13 @@ public interface FileSystemStore {
 
   /**
    * Delete everything. Used for testing.
-   * @throws IOException
+   * @throws IOException on any problem
    */
   void purge() throws IOException;
   
   /**
    * Diagnostic method to dump all INodes to the console.
-   * @throws IOException
+   * @throws IOException on any problem
    */
   void dump() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java
index fdacc3f..5ab352a 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java
@@ -38,6 +38,8 @@ public class S3Credentials {
   private String secretAccessKey; 
 
   /**
+   * @param uri bucket URI optionally containing username and password.
+   * @param conf configuration
    * @throws IllegalArgumentException if credentials for S3 cannot be
    * determined.
    * @throws IOException if credential providers are misconfigured and we have

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
index 2a24273..e62ec77 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
@@ -21,7 +21,11 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AnonymousAWSCredentials;
 import com.amazonaws.auth.AWSCredentials;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
+@InterfaceAudience.Private
+@InterfaceStability.Stable
 public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider {
   public AWSCredentials getCredentials() {
     return new AnonymousAWSCredentials();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
index 9a0adda..2f721e4 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
@@ -23,7 +23,11 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.AWSCredentials;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
+@InterfaceAudience.Private
+@InterfaceStability.Stable
 public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
   private final String accessKey;
   private final String secretKey;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index a0707bb..223b3aa 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -18,7 +18,19 @@
 
 package org.apache.hadoop.fs.s3a;
 
-public class Constants {
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * All the constants used with the {@link S3AFileSystem}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class Constants {
+
+  private Constants() {
+  }
+
   // s3 access key
   public static final String ACCESS_KEY = "fs.s3a.access.key";
 
@@ -129,4 +141,8 @@ public class Constants {
   public static final int S3A_DEFAULT_PORT = -1;
 
   public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";
+
+  /** read ahead buffer size to prevent connection re-establishments. */
+  public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
+  public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
index 2e06fba..3353522 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.util.Progressable;
@@ -64,6 +65,7 @@ import java.util.concurrent.ThreadPoolExecutor;
  * <p>
  * Unstable: statistics and error handling might evolve
  */
+@InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class S3AFastOutputStream extends OutputStream {
 
@@ -102,7 +104,8 @@ public class S3AFastOutputStream extends OutputStream {
    * @param partSize size of a single part in a multi-part upload (except
    * last part)
    * @param multiPartThreshold files at least this size use multi-part upload
-   * @throws IOException
+   * @param threadPoolExecutor thread factory
+   * @throws IOException on any problem
    */
   public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
       String bucket, String key, Progressable progress,
@@ -159,7 +162,7 @@ public class S3AFastOutputStream extends OutputStream {
    * Writes a byte to the memory buffer. If this causes the buffer to reach
    * its limit, the actual upload is submitted to the threadpool.
    * @param b the int of which the lowest byte is written
-   * @throws IOException
+   * @throws IOException on any problem
    */
   @Override
   public synchronized void write(int b) throws IOException {
@@ -177,10 +180,10 @@ public class S3AFastOutputStream extends OutputStream {
    * @param b byte array containing
    * @param off offset in array where to start
    * @param len number of bytes to be written
-   * @throws IOException
+   * @throws IOException on any problem
    */
   @Override
-  public synchronized void write(byte b[], int off, int len)
+  public synchronized void write(byte[] b, int off, int len)
       throws IOException {
     if (b == null) {
       throw new NullPointerException();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
index 47caea8..9ecca33 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
@@ -17,9 +17,19 @@
  */
 package org.apache.hadoop.fs.s3a;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
+/**
+ * File status for an S3A "file".
+ * Modification time is trouble, see {@link #getModificationTime()}.
+ *
+ * The subclass is private as it should not be created directly.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class S3AFileStatus extends FileStatus {
   private boolean isEmptyDirectory;
 
@@ -45,7 +55,7 @@ public class S3AFileStatus extends FileStatus {
     return System.getProperty("user.name");
   }
 
-  /** Compare if this object is equal to another object
+  /** Compare if this object is equal to another object.
    * @param   o the object to be compared.
    * @return  true if two file status has the same path name; false if not.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8d74a580/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index d47b7b5..d6a4617 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
@@ -59,8 +60,11 @@ import com.amazonaws.event.ProgressListener;
 import com.amazonaws.event.ProgressEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -79,9 +83,24 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * The core S3A Filesystem implementation.
+ *
+ * This subclass is marked as private as code should not be creating it
+ * directly; use {@link FileSystem#get(Configuration)} and variants to
+ * create one.
+ *
+ * If cast to {@code S3AFileSystem}, extra methods and features may be accessed.
+ * Consider those private and unstable.
+ *
+ * Because it prints some of the state of the instrumentation,
+ * the output of {@link #toString()} must also be considered unstable.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class S3AFileSystem extends FileSystem {
   /**
-   * Default blocksize as used in blocksize and FS status queries
+   * Default blocksize as used in blocksize and FS status queries.
    */
   public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
   private URI uri;
@@ -97,11 +116,14 @@ public class S3AFileSystem extends FileSystem {
   public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
   private CannedAccessControlList cannedACL;
   private String serverSideEncryptionAlgorithm;
+  private S3AInstrumentation instrumentation;
+  private long readAhead;
 
   // The maximum number of entries that can be deleted in any call to s3
   private static final int MAX_ENTRIES_TO_DELETE = 1000;
 
   private static final AtomicInteger poolNumber = new AtomicInteger(1);
+
   /**
    * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
    * with a common prefix.
@@ -110,17 +132,19 @@ public class S3AFileSystem extends FileSystem {
    */
   public static ThreadFactory getNamedThreadFactory(final String prefix) {
     SecurityManager s = System.getSecurityManager();
-    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
-        .getThreadGroup();
+    final ThreadGroup threadGroup = (s != null)
+        ? s.getThreadGroup()
+        : Thread.currentThread().getThreadGroup();
 
     return new ThreadFactory() {
-      final AtomicInteger threadNumber = new AtomicInteger(1);
+      private final AtomicInteger threadNumber = new AtomicInteger(1);
       private final int poolNum = poolNumber.getAndIncrement();
-      final ThreadGroup group = threadGroup;
+      private final ThreadGroup group = threadGroup;
 
       @Override
       public Thread newThread(Runnable r) {
-        final String name = prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
+        final String name = String.format("%s-pool%03d-t%04d",
+            prefix, poolNum, threadNumber.getAndIncrement());
         return new Thread(group, r, name);
       }
     };
@@ -157,10 +181,12 @@ public class S3AFileSystem extends FileSystem {
    */
   public void initialize(URI name, Configuration conf) throws IOException {
     super.initialize(name, conf);
+    setConf(conf);
+    instrumentation = new S3AInstrumentation(name);
 
     uri = URI.create(name.getScheme() + "://" + name.getAuthority());
-    workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
-        this.getWorkingDirectory());
+    workingDir = new Path("/user", System.getProperty("user.name"))
+        .makeQualified(this.uri, this.getWorkingDirectory());
 
     AWSAccessKeys creds = getAWSAccessKeys(name, conf);
 
@@ -174,19 +200,20 @@ public class S3AFileSystem extends FileSystem {
     bucket = name.getHost();
 
     ClientConfiguration awsConf = new ClientConfiguration();
-    awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
-      DEFAULT_MAXIMUM_CONNECTIONS));
+    awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+        DEFAULT_MAXIMUM_CONNECTIONS, 1));
     boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
         DEFAULT_SECURE_CONNECTIONS);
     awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
-    awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
-      DEFAULT_MAX_ERROR_RETRIES));
-    awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT,
-        DEFAULT_ESTABLISH_TIMEOUT));
-    awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
-      DEFAULT_SOCKET_TIMEOUT));
+    awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+        DEFAULT_MAX_ERROR_RETRIES, 0));
+    awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+        DEFAULT_ESTABLISH_TIMEOUT, 0));
+    awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+        DEFAULT_SOCKET_TIMEOUT, 0));
     String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
-    if(!signerOverride.isEmpty()) {
+    if (!signerOverride.isEmpty()) {
+      LOG.debug("Signer override = {}", signerOverride);
       awsConf.setSignerOverride(signerOverride);
     }
 
@@ -196,34 +223,38 @@ public class S3AFileSystem extends FileSystem {
 
     initAmazonS3Client(conf, credentials, awsConf);
 
-    maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
+    maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
     partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
-      DEFAULT_MIN_MULTIPART_THRESHOLD);
-    enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
-
     if (partSize < 5 * 1024 * 1024) {
       LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
       partSize = 5 * 1024 * 1024;
     }
 
+    multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
+        DEFAULT_MIN_MULTIPART_THRESHOLD);
     if (multiPartThreshold < 5 * 1024 * 1024) {
       LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
       multiPartThreshold = 5 * 1024 * 1024;
     }
+    //check but do not store the block size
+    longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
+    enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
 
-    int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
-    int coreThreads = conf.getInt(CORE_THREADS, DEFAULT_CORE_THREADS);
+    readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
+
+    int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0);
+    int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0);
     if (maxThreads == 0) {
       maxThreads = Runtime.getRuntime().availableProcessors() * 8;
     }
     if (coreThreads == 0) {
       coreThreads = Runtime.getRuntime().availableProcessors() * 8;
     }
-    long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
+    long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
+        DEFAULT_KEEPALIVE_TIME, 0);
     LinkedBlockingQueue<Runnable> workQueue =
-      new LinkedBlockingQueue<>(maxThreads *
-        conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
+        new LinkedBlockingQueue<>(maxThreads *
+            intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1));
     threadPoolExecutor = new ThreadPoolExecutor(
         coreThreads,
         maxThreads,
@@ -238,19 +269,17 @@ public class S3AFileSystem extends FileSystem {
     initCannedAcls(conf);
 
     if (!s3.doesBucketExist(bucket)) {
-      throw new IOException("Bucket " + bucket + " does not exist");
+      throw new FileNotFoundException("Bucket " + bucket + " does not exist");
     }
 
     initMultipartUploads(conf);
 
     serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
 
-    setConf(conf);
   }
 
   void initProxySupport(Configuration conf, ClientConfiguration awsConf,
-      boolean secureConnections) throws IllegalArgumentException,
-      IllegalArgumentException {
+      boolean secureConnections) throws IllegalArgumentException {
     String proxyHost = conf.getTrimmed(PROXY_HOST, "");
     int proxyPort = conf.getInt(PROXY_PORT, -1);
     if (!proxyHost.isEmpty()) {
@@ -281,7 +310,8 @@ public class S3AFileSystem extends FileSystem {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
                 "domain {} as workstation {}", awsConf.getProxyHost(),
-            awsConf.getProxyPort(), String.valueOf(awsConf.getProxyUsername()),
+            awsConf.getProxyPort(),
+            String.valueOf(awsConf.getProxyUsername()),
             awsConf.getProxyPassword(), awsConf.getProxyDomain(),
             awsConf.getProxyWorkstation());
       }
@@ -316,7 +346,7 @@ public class S3AFileSystem extends FileSystem {
       AWSCredentialsProviderChain credentials, ClientConfiguration awsConf)
       throws IllegalArgumentException {
     s3 = new AmazonS3Client(credentials, awsConf);
-    String endPoint = conf.getTrimmed(ENDPOINT,"");
+    String endPoint = conf.getTrimmed(ENDPOINT, "");
     if (!endPoint.isEmpty()) {
       try {
         s3.setEndpoint(endPoint);
@@ -359,14 +389,25 @@ public class S3AFileSystem extends FileSystem {
 
   private void initMultipartUploads(Configuration conf) {
     boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
-      DEFAULT_PURGE_EXISTING_MULTIPART);
-    long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
-      DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
+        DEFAULT_PURGE_EXISTING_MULTIPART);
+    long purgeExistingMultipartAge = longOption(conf,
+        PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0);
 
     if (purgeExistingMultipart) {
-      Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
+      Date purgeBefore =
+          new Date(new Date().getTime() - purgeExistingMultipartAge * 1000);
 
-      transfers.abortMultipartUploads(bucket, purgeBefore);
+      try {
+        transfers.abortMultipartUploads(bucket, purgeBefore);
+      } catch (AmazonServiceException e) {
+        if (e.getStatusCode() == 403) {
+          instrumentation.errorIgnored();
+          LOG.debug("Failed to abort multipart uploads against {}," +
+              " FS may be read only", bucket, e);
+        } else {
+          throw e;
+        }
+      }
     }
   }
 
@@ -479,16 +520,15 @@ public class S3AFileSystem extends FileSystem {
   public FSDataInputStream open(Path f, int bufferSize)
       throws IOException {
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Opening '{}' for reading.", f);
-    }
+    LOG.debug("Opening '{}' for reading.", f);
     final FileStatus fileStatus = getFileStatus(f);
     if (fileStatus.isDirectory()) {
-      throw new FileNotFoundException("Can't open " + f + " because it is a directory");
+      throw new FileNotFoundException("Can't open " + f
+          + " because it is a directory");
     }
 
     return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
-      fileStatus.getLen(), s3, statistics));
+      fileStatus.getLen(), s3, statistics, instrumentation, readAhead));
   }
 
   /**
@@ -514,16 +554,26 @@ public class S3AFileSystem extends FileSystem {
     if (!overwrite && exists(f)) {
       throw new FileAlreadyExistsException(f + " already exists");
     }
+    instrumentation.fileCreated();
     if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
       return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
           key, progress, statistics, cannedACL,
           serverSideEncryptionAlgorithm, partSize, multiPartThreshold,
           threadPoolExecutor), statistics);
     }
-    // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
-    return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
-      bucket, key, progress, cannedACL, statistics,
-      serverSideEncryptionAlgorithm), null);
+    // We pass null to FSDataOutputStream so it won't count writes that
+    // are being buffered to a file
+    return new FSDataOutputStream(
+        new S3AOutputStream(getConf(),
+            transfers,
+            this,
+            bucket,
+            key,
+            progress,
+            cannedACL,
+            statistics,
+            serverSideEncryptionAlgorithm),
+        null);
   }
 
   /**
@@ -534,7 +584,7 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException indicating that append is not supported.
    */
   public FSDataOutputStream append(Path f, int bufferSize,
-    Progressable progress) throws IOException {
+      Progressable progress) throws IOException {
     throw new IOException("Not supported");
   }
 
@@ -559,17 +609,13 @@ public class S3AFileSystem extends FileSystem {
    * @return true if rename is successful
    */
   public boolean rename(Path src, Path dst) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Rename path {} to {}", src, dst);
-    }
+    LOG.debug("Rename path {} to {}", src, dst);
 
     String srcKey = pathToKey(src);
     String dstKey = pathToKey(dst);
 
     if (srcKey.isEmpty() || dstKey.isEmpty()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("rename: src or dst are empty");
-      }
+      LOG.debug("rename: source {} or dest {}, is empty", srcKey, dstKey);
       return false;
     }
 
@@ -582,9 +628,8 @@ public class S3AFileSystem extends FileSystem {
     }
 
     if (srcKey.equals(dstKey)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("rename: src and dst refer to the same file or directory");
-      }
+      LOG.debug("rename: src and dst refer to the same file or directory: {}",
+          dst);
       return srcStatus.isFile();
     }
 
@@ -593,9 +638,8 @@ public class S3AFileSystem extends FileSystem {
       dstStatus = getFileStatus(dst);
 
       if (srcStatus.isDirectory() && dstStatus.isFile()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("rename: src is a directory and dst is a file");
-        }
+        LOG.debug("rename: src {} is a directory and dst {} is a file",
+            src, dst);
         return false;
       }
 
@@ -603,6 +647,7 @@ public class S3AFileSystem extends FileSystem {
         return false;
       }
     } catch (FileNotFoundException e) {
+      LOG.debug("rename: destination path {} not found", dst);
       // Parent must exist
       Path parent = dst.getParent();
       if (!pathToKey(parent).isEmpty()) {
@@ -612,6 +657,8 @@ public class S3AFileSystem extends FileSystem {
             return false;
           }
         } catch (FileNotFoundException e2) {
+          LOG.debug("rename: destination path {} has no parent {}",
+              dst, parent);
           return false;
         }
       }
@@ -619,9 +666,7 @@ public class S3AFileSystem extends FileSystem {
 
     // Ok! Time to start
     if (srcStatus.isFile()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("rename: renaming file " + src + " to " + dst);
-      }
+      LOG.debug("rename: renaming file {} to {}", src, dst);
       if (dstStatus != null && dstStatus.isDirectory()) {
         String newDstKey = dstKey;
         if (!newDstKey.endsWith("/")) {
@@ -630,15 +675,13 @@ public class S3AFileSystem extends FileSystem {
         String filename =
             srcKey.substring(pathToKey(src.getParent()).length()+1);
         newDstKey = newDstKey + filename;
-        copyFile(srcKey, newDstKey);
+        copyFile(srcKey, newDstKey, srcStatus.getLen());
       } else {
-        copyFile(srcKey, dstKey);
+        copyFile(srcKey, dstKey, srcStatus.getLen());
       }
       delete(src, false);
     } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("rename: renaming directory " + src + " to " + dst);
-      }
+      LOG.debug("rename: renaming directory {} to {}", src, dst);
 
       // This is a directory to directory copy
       if (!dstKey.endsWith("/")) {
@@ -651,14 +694,12 @@ public class S3AFileSystem extends FileSystem {
 
       //Verify dest is not a child of the source directory
       if (dstKey.startsWith(srcKey)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("cannot rename a directory to a subdirectory of self");
-        }
+        LOG.debug("cannot rename a directory {}" +
+              " to a subdirectory of self: {}", srcKey, dstKey);
         return false;
       }
 
-      List<DeleteObjectsRequest.KeyVersion> keysToDelete =
-        new ArrayList<>();
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
       if (dstStatus != null && dstStatus.isEmptyDirectory()) {
         // delete unnecessary fake directory.
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
@@ -676,7 +717,7 @@ public class S3AFileSystem extends FileSystem {
         for (S3ObjectSummary summary : objects.getObjectSummaries()) {
           keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
           String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
-          copyFile(summary.getKey(), newDstKey);
+          copyFile(summary.getKey(), newDstKey, summary.getSize());
 
           if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
             removeKeys(keysToDelete, true);
@@ -715,6 +756,7 @@ public class S3AFileSystem extends FileSystem {
       DeleteObjectsRequest deleteRequest
           = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
       s3.deleteObjects(deleteRequest);
+      instrumentation.fileDeleted(keysToDelete.size());
       statistics.incrementWriteOps(1);
     } else {
       int writeops = 0;
@@ -724,7 +766,7 @@ public class S3AFileSystem extends FileSystem {
             new DeleteObjectRequest(bucket, keyVersion.getKey()));
         writeops++;
       }
-
+      instrumentation.fileDeleted(keysToDelete.size());
       statistics.incrementWriteOps(writeops);
     }
     if (clearKeys) {
@@ -742,25 +784,20 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException due to inability to delete a directory or file.
    */
   public boolean delete(Path f, boolean recursive) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Delete path " + f + " - recursive " + recursive);
-    }
+    LOG.debug("Delete path {} - recursive {}", f , recursive);
     S3AFileStatus status;
     try {
       status = getFileStatus(f);
     } catch (FileNotFoundException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Couldn't delete " + f + " - does not exist");
-      }
+      LOG.debug("Couldn't delete {} - does not exist", f);
+      instrumentation.errorIgnored();
       return false;
     }
 
     String key = pathToKey(f);
 
     if (status.isDirectory()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("delete: Path is a directory");
-      }
+      LOG.debug("delete: Path is a directory: {}", f);
 
       if (!recursive && !status.isEmptyDirectory()) {
         throw new IOException("Path is a folder: " + f +
@@ -777,15 +814,12 @@ public class S3AFileSystem extends FileSystem {
       }
 
       if (status.isEmptyDirectory()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Deleting fake empty directory");
-        }
+        LOG.debug("Deleting fake empty directory {}", key);
         s3.deleteObject(bucket, key);
+        instrumentation.directoryDeleted();
         statistics.incrementWriteOps(1);
       } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Getting objects for directory prefix " + key + " to delete");
-        }
+        LOG.debug("Getting objects for directory prefix {} to delete", key);
 
         ListObjectsRequest request = new ListObjectsRequest();
         request.setBucketName(bucket);
@@ -794,16 +828,13 @@ public class S3AFileSystem extends FileSystem {
         //request.setDelimiter("/");
         request.setMaxKeys(maxKeys);
 
-        List<DeleteObjectsRequest.KeyVersion> keys =
-          new ArrayList<>();
+        List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>();
         ObjectListing objects = s3.listObjects(request);
         statistics.incrementReadOps(1);
         while (true) {
           for (S3ObjectSummary summary : objects.getObjectSummaries()) {
             keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Got object to delete " + summary.getKey());
-            }
+            LOG.debug("Got object to delete {}", summary.getKey());
 
             if (keys.size() == MAX_ENTRIES_TO_DELETE) {
               removeKeys(keys, true);
@@ -822,10 +853,9 @@ public class S3AFileSystem extends FileSystem {
         }
       }
     } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("delete: Path is a file");
-      }
+      LOG.debug("delete: Path is a file");
       s3.deleteObject(bucket, key);
+      instrumentation.fileDeleted(1);
       statistics.incrementWriteOps(1);
     }
 
@@ -837,9 +867,7 @@ public class S3AFileSystem extends FileSystem {
   private void createFakeDirectoryIfNecessary(Path f) throws IOException {
     String key = pathToKey(f);
     if (!key.isEmpty() && !exists(f)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Creating new fake directory at " + f);
-      }
+      LOG.debug("Creating new fake directory at {}", f);
       createFakeDirectory(bucket, key);
     }
   }
@@ -856,9 +884,7 @@ public class S3AFileSystem extends FileSystem {
   public FileStatus[] listStatus(Path f) throws FileNotFoundException,
       IOException {
     String key = pathToKey(f);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("List status for path: " + f);
-    }
+    LOG.debug("List status for path: {}", f);
 
     final List<FileStatus> result = new ArrayList<FileStatus>();
     final FileStatus fileStatus =  getFileStatus(f);
@@ -874,9 +900,7 @@ public class S3AFileSystem extends FileSystem {
       request.setDelimiter("/");
       request.setMaxKeys(maxKeys);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("listStatus: doing listObjects for directory " + key);
-      }
+      LOG.debug("listStatus: doing listObjects for directory {}", key);
 
       ObjectListing objects = s3.listObjects(request);
       statistics.incrementReadOps(1);
@@ -889,24 +913,18 @@ public class S3AFileSystem extends FileSystem {
           // Skip over keys that are ourselves and old S3N _$folder$ files
           if (keyPath.equals(fQualified) ||
               summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Ignoring: " + keyPath);
-            }
+            LOG.debug("Ignoring: {}", keyPath);
             continue;
           }
 
           if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
             result.add(new S3AFileStatus(true, true, keyPath));
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Adding: fd: " + keyPath);
-            }
+            LOG.debug("Adding: fd: {}", keyPath);
           } else {
             result.add(new S3AFileStatus(summary.getSize(),
                 dateToLong(summary.getLastModified()), keyPath,
                 getDefaultBlockSize(fQualified)));
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Adding: fi: " + keyPath);
-            }
+            LOG.debug("Adding: fi: {}", keyPath);
           }
         }
 
@@ -916,16 +934,11 @@ public class S3AFileSystem extends FileSystem {
             continue;
           }
           result.add(new S3AFileStatus(true, false, keyPath));
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding: rd: " + keyPath);
-          }
+          LOG.debug("Adding: rd: {}", keyPath);
         }
 
         if (objects.isTruncated()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("listStatus: list truncated - getting next batch");
-          }
-
+          LOG.debug("listStatus: list truncated - getting next batch");
           objects = s3.listNextBatchOfObjects(objects);
           statistics.incrementReadOps(1);
         } else {
@@ -933,9 +946,7 @@ public class S3AFileSystem extends FileSystem {
         }
       }
     } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding: rd (not a dir): " + f);
-      }
+      LOG.debug("Adding: rd (not a dir): {}", f);
       result.add(fileStatus);
     }
 
@@ -948,14 +959,14 @@ public class S3AFileSystem extends FileSystem {
    * Set the current working directory for the given file system. All relative
    * paths will be resolved relative to it.
    *
-   * @param new_dir the current working directory.
+   * @param newDir the current working directory.
    */
-  public void setWorkingDirectory(Path new_dir) {
-    workingDir = new_dir;
+  public void setWorkingDirectory(Path newDir) {
+    workingDir = newDir;
   }
 
   /**
-   * Get the current working directory for the given file system
+   * Get the current working directory for the given file system.
    * @return the directory pathname
    */
   public Path getWorkingDirectory() {
@@ -972,10 +983,7 @@ public class S3AFileSystem extends FileSystem {
   // TODO: If we have created an empty file at /foo/bar and we then call
   // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Making directory: " + f);
-    }
-
+    LOG.debug("Making directory: {}", f);
 
     try {
       FileStatus fileStatus = getFileStatus(f);
@@ -996,6 +1004,7 @@ public class S3AFileSystem extends FileSystem {
                 fPart));
           }
         } catch (FileNotFoundException fnfe) {
+          instrumentation.errorIgnored();
         }
         fPart = fPart.getParent();
       } while (fPart != null);
@@ -1015,10 +1024,7 @@ public class S3AFileSystem extends FileSystem {
    */
   public S3AFileStatus getFileStatus(Path f) throws IOException {
     String key = pathToKey(f);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Getting path status for " + f + " (" + key + ")");
-    }
-
+    LOG.debug("Getting path status for {}  ({})", f , key);
 
     if (!key.isEmpty()) {
       try {
@@ -1026,15 +1032,11 @@ public class S3AFileSystem extends FileSystem {
         statistics.incrementReadOps(1);
 
         if (objectRepresentsDirectory(key, meta.getContentLength())) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found exact file: fake directory");
-          }
+          LOG.debug("Found exact file: fake directory");
           return new S3AFileStatus(true, true,
               f.makeQualified(uri, workingDir));
         } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found exact file: normal file");
-          }
+          LOG.debug("Found exact file: normal file");
           return new S3AFileStatus(meta.getContentLength(),
               dateToLong(meta.getLastModified()),
               f.makeQualified(uri, workingDir),
@@ -1042,25 +1044,23 @@ public class S3AFileSystem extends FileSystem {
         }
       } catch (AmazonServiceException e) {
         if (e.getStatusCode() != 404) {
-          printAmazonServiceException(e);
+          printAmazonServiceException(f.toString(), e);
           throw e;
         }
       } catch (AmazonClientException e) {
-        printAmazonClientException(e);
+        printAmazonClientException(f.toString(), e);
         throw e;
       }
 
       // Necessary?
       if (!key.endsWith("/")) {
+        String newKey = key + "/";
         try {
-          String newKey = key + "/";
           ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
           statistics.incrementReadOps(1);
 
           if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Found file (with /): fake directory");
-            }
+            LOG.debug("Found file (with /): fake directory");
             return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
           } else {
             LOG.warn("Found file (with /): real file? should not happen: {}", key);
@@ -1072,11 +1072,11 @@ public class S3AFileSystem extends FileSystem {
           }
         } catch (AmazonServiceException e) {
           if (e.getStatusCode() != 404) {
-            printAmazonServiceException(e);
+            printAmazonServiceException(newKey, e);
             throw e;
           }
         } catch (AmazonClientException e) {
-          printAmazonClientException(e);
+          printAmazonClientException(newKey, e);
           throw e;
         }
       }
@@ -1096,17 +1096,17 @@ public class S3AFileSystem extends FileSystem {
       statistics.incrementReadOps(1);
 
       if (!objects.getCommonPrefixes().isEmpty()
-          || objects.getObjectSummaries().size() > 0) {
+          || !objects.getObjectSummaries().isEmpty()) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Found path as directory (with /): " +
-              objects.getCommonPrefixes().size() + "/" +
+          LOG.debug("Found path as directory (with /): {}/{}",
+              objects.getCommonPrefixes().size() ,
               objects.getObjectSummaries().size());
 
           for (S3ObjectSummary summary : objects.getObjectSummaries()) {
-            LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
+            LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
           }
           for (String prefix : objects.getCommonPrefixes()) {
-            LOG.debug("Prefix: " + prefix);
+            LOG.debug("Prefix: {}", prefix);
           }
         }
 
@@ -1118,17 +1118,15 @@ public class S3AFileSystem extends FileSystem {
       }
     } catch (AmazonServiceException e) {
       if (e.getStatusCode() != 404) {
-        printAmazonServiceException(e);
+        printAmazonServiceException(key, e);
         throw e;
       }
     } catch (AmazonClientException e) {
-      printAmazonClientException(e);
+      printAmazonClientException(key, e);
       throw e;
     }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Not Found: " + f);
-    }
+    LOG.debug("Not Found: {}", f);
     throw new FileNotFoundException("No such file or directory: " + f);
   }
 
@@ -1147,15 +1145,13 @@ public class S3AFileSystem extends FileSystem {
    */
   @Override
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
-    Path dst) throws IOException {
+      Path dst) throws IOException {
     String key = pathToKey(dst);
 
     if (!overwrite && exists(dst)) {
-      throw new IOException(dst + " already exists");
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Copying local file from " + src + " to " + dst);
+      throw new FileAlreadyExistsException(dst + " already exists");
     }
+    LOG.debug("Copying local file from {} to {}", src, dst);
 
     // Since we have a local file, we don't need to stream into a temporary file
     LocalFileSystem local = getLocal(getConf());
@@ -1181,13 +1177,14 @@ public class S3AFileSystem extends FileSystem {
       }
     };
 
+    statistics.incrementWriteOps(1);
     Upload up = transfers.upload(putObjectRequest);
     up.addProgressListener(progressListener);
     try {
       up.waitForUploadResult();
-      statistics.incrementWriteOps(1);
     } catch (InterruptedException e) {
-      throw new IOException("Got interrupted, cancelling");
+      throw new InterruptedIOException("Interrupted copying " + src
+          + " to "  + dst + ", cancelling");
     }
 
     // This will delete unnecessary fake parent directories
@@ -1211,7 +1208,7 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
-  * Override getCononicalServiceName because we don't support token in S3A
+  * Override getCanonicalServiceName because we don't support token in S3A.
   */
   @Override
   public String getCanonicalServiceName() {
@@ -1219,17 +1216,17 @@ public class S3AFileSystem extends FileSystem {
     return null;
   }
 
-  private void copyFile(String srcKey, String dstKey) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("copyFile " + srcKey + " -> " + dstKey);
-    }
+  private void copyFile(String srcKey, String dstKey, long size)
+      throws IOException {
+    LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
 
     ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
     ObjectMetadata dstom = cloneObjectMetadata(srcom);
     if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
       dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
     }
-    CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
+    CopyObjectRequest copyObjectRequest =
+        new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
     copyObjectRequest.setCannedAccessControlList(cannedACL);
     copyObjectRequest.setNewObjectMetadata(dstom);
 
@@ -1250,13 +1247,17 @@ public class S3AFileSystem extends FileSystem {
     try {
       copy.waitForCopyResult();
       statistics.incrementWriteOps(1);
+      instrumentation.filesCopied(1, size);
     } catch (InterruptedException e) {
-      throw new IOException("Got interrupted, cancelling");
+      throw new InterruptedIOException("Interrupted copying " + srcKey
+          + " to " + dstKey + ", cancelling");
     }
   }
 
   private boolean objectRepresentsDirectory(final String name, final long size) {
-    return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L;
+    return !name.isEmpty()
+        && name.charAt(name.length() - 1) == '/'
+        && size == 0L;
   }
 
   // Handles null Dates that can be returned by AWS
@@ -1274,8 +1275,9 @@ public class S3AFileSystem extends FileSystem {
 
   private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
     while (true) {
+      String key = "";
       try {
-        String key = pathToKey(f);
+        key = pathToKey(f);
         if (key.isEmpty()) {
           break;
         }
@@ -1283,13 +1285,13 @@ public class S3AFileSystem extends FileSystem {
         S3AFileStatus status = getFileStatus(f);
 
         if (status.isDirectory() && status.isEmptyDirectory()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Deleting fake directory " + key + "/");
-          }
+          LOG.debug("Deleting fake directory {}/", key);
           s3.deleteObject(bucket, key + "/");
           statistics.incrementWriteOps(1);
         }
       } catch (FileNotFoundException | AmazonServiceException e) {
+        LOG.debug("While deleting key {} ", key, e);
+        instrumentation.errorIgnored();
       }
 
       if (f.isRoot()) {
@@ -1325,10 +1327,12 @@ public class S3AFileSystem extends FileSystem {
     if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
       om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
     }
-    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om);
+    PutObjectRequest putObjectRequest =
+        new PutObjectRequest(bucketName, objectName, im, om);
     putObjectRequest.setCannedAcl(cannedACL);
     s3.putObject(putObjectRequest);
     statistics.incrementWriteOps(1);
+    instrumentation.directoryCreated();
   }
 
   /**
@@ -1400,31 +1404,115 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * Return the number of bytes that large input files should be optimally
-   * be split into to minimize i/o time.
+   * be split into to minimize I/O time.
    * @deprecated use {@link #getDefaultBlockSize(Path)} instead
    */
   @Deprecated
   public long getDefaultBlockSize() {
-    // default to 32MB: large enough to minimize the impact of seeks
     return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
   }
 
-  private void printAmazonServiceException(AmazonServiceException ase) {
-    LOG.info("Caught an AmazonServiceException, which means your request made it " +
-        "to Amazon S3, but was rejected with an error response for some reason.");
-    LOG.info("Error Message: " + ase.getMessage());
-    LOG.info("HTTP Status Code: " + ase.getStatusCode());
-    LOG.info("AWS Error Code: " + ase.getErrorCode());
-    LOG.info("Error Type: " + ase.getErrorType());
-    LOG.info("Request ID: " + ase.getRequestId());
-    LOG.info("Class Name: " + ase.getClass().getName());
+  private void printAmazonServiceException(String target,
+      AmazonServiceException ase) {
+    LOG.info("{}: caught an AmazonServiceException {}", target, ase);
+    LOG.info("This means your request made it to Amazon S3," +
+        " but was rejected with an error response for some reason.");
+    LOG.info("Error Message: {}", ase.getMessage());
+    LOG.info("HTTP Status Code: {}", ase.getStatusCode());
+    LOG.info("AWS Error Code: {}", ase.getErrorCode());
+    LOG.info("Error Type: {}", ase.getErrorType());
+    LOG.info("Request ID: {}", ase.getRequestId());
+    LOG.info("Class Name: {}", ase.getClass().getName());
+    LOG.info("Exception", ase);
+  }
+
+  private void printAmazonClientException(String target,
+      AmazonClientException ace) {
+    LOG.info("{}: caught an AmazonClientException {}", target, ace);
+    LOG.info("This means the client encountered " +
+        "a problem while trying to communicate with S3, " +
+        "such as not being able to access the network.", ace);
   }
 
-  private void printAmazonClientException(AmazonClientException ace) {
-    LOG.info("Caught an AmazonClientException, which means the client encountered " +
-        "a serious internal problem while trying to communicate with S3, " +
-        "such as not being able to access the network.");
-    LOG.info("Error Message: {}" + ace, ace);
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "S3AFileSystem{");
+    sb.append("uri=").append(uri);
+    sb.append(", workingDir=").append(workingDir);
+    sb.append(", partSize=").append(partSize);
+    sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
+    sb.append(", maxKeys=").append(maxKeys);
+    sb.append(", cannedACL=").append(cannedACL.toString());
+    sb.append(", readAhead=").append(readAhead);
+    sb.append(", blockSize=").append(getDefaultBlockSize());
+    sb.append(", multiPartThreshold=").append(multiPartThreshold);
+    if (serverSideEncryptionAlgorithm != null) {
+      sb.append(", serverSideEncryptionAlgorithm='")
+          .append(serverSideEncryptionAlgorithm)
+          .append('\'');
+    }
+    sb.append(", statistics {")
+        .append(statistics.toString())
+        .append("}");
+    sb.append(", metrics {")
+        .append(instrumentation.dump("{", "=", "} ", true))
+        .append("}");
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Get the partition size for multipart operations.
+   * @return the value as set during initialization
+   */
+  public long getPartitionSize() {
+    return partSize;
+  }
+
+  /**
+   * Get the threshold for multipart files
+   * @return the value as set during initialization
+   */
+  public long getMultiPartThreshold() {
+    return multiPartThreshold;
+  }
+
+  /**
+   * Get a integer option >= the minimum allowed value.
+   * @param conf configuration
+   * @param key key to look up
+   * @param defVal default value
+   * @param min minimum value
+   * @return the value
+   * @throws IllegalArgumentException if the value is below the minimum
+   */
+  static int intOption(Configuration conf, String key, int defVal, int min) {
+    int v = conf.getInt(key, defVal);
+    Preconditions.checkArgument(v >= min,
+        String.format("Value of %s: %d is below the minimum value %d",
+            key, v, min));
+    return v;
+  }
+
+  /**
+   * Get a long option >= the minimum allowed value.
+   * @param conf configuration
+   * @param key key to look up
+   * @param defVal default value
+   * @param min minimum value
+   * @return the value
+   * @throws IllegalArgumentException if the value is below the minimum
+   */
+  static long longOption(Configuration conf,
+      String key,
+      long defVal,
+      long min) {
+    long v = conf.getLong(key, defVal);
+    Preconditions.checkArgument(v >= min,
+        String.format("Value of %s: %d is below the minimum value %d",
+            key, v, min));
+    return v;
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message