hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject [3/4] hadoop git commit: HADOOP-13203 S3A: Support fadvise "random" mode for high performance readPositioned() reads. Contributed by Rajesh Balamohan and stevel.
Date Wed, 22 Jun 2016 15:26:45 GMT
HADOOP-13203 S3A: Support fadvise "random" mode for high performance readPositioned() reads. Contributed by Rajesh Balamohan and stevel.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4ee35436
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4ee35436
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4ee35436

Branch: refs/heads/HDFS-1312
Commit: 4ee3543625c77c06d566fe81644d21c607d6d74d
Parents: 6708987
Author: Steve Loughran <stevel@apache.org>
Authored: Wed Jun 22 15:41:26 2016 +0100
Committer: Steve Loughran <stevel@apache.org>
Committed: Wed Jun 22 15:45:25 2016 +0100

----------------------------------------------------------------------
 .../org/apache/hadoop/fs/FSInputStream.java     |   5 +-
 .../fs/contract/AbstractContractSeekTest.java   |  11 +-
 .../org/apache/hadoop/fs/s3a/Constants.java     |  36 ++
 .../org/apache/hadoop/fs/s3a/S3AFileSystem.java |  29 +-
 .../apache/hadoop/fs/s3a/S3AInputPolicy.java    |  76 ++++
 .../apache/hadoop/fs/s3a/S3AInputStream.java    | 187 +++++++--
 .../hadoop/fs/s3a/S3AInstrumentation.java       |  20 +-
 .../org/apache/hadoop/fs/s3a/Statistic.java     |  32 +-
 .../src/site/markdown/tools/hadoop-aws/index.md |  76 ++++
 .../hadoop/fs/s3a/TestS3AInputPolicies.java     |  91 +++++
 .../scale/TestS3AInputStreamPerformance.java    | 398 +++++++++++++++----
 11 files changed, 819 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
index 64fbb45..672ab15 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java
@@ -105,7 +105,10 @@ public abstract class FSInputStream extends InputStream
     Preconditions.checkArgument(buffer != null, "Null buffer");
     if (buffer.length - offset < length) {
       throw new IndexOutOfBoundsException(
-          FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER);
+          FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+              + ": request length=" + length
+              + ", with offset ="+ offset
+              + "; buffer capacity =" + (buffer.length - offset));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
index f1ca8cb..3e71682 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
@@ -271,7 +271,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
   public void testSeekBigFile() throws Throwable {
     describe("Seek round a large file and verify the bytes are what is expected");
     Path testSeekFile = path("bigseekfile.txt");
-    byte[] block = dataset(65536, 0, 255);
+    byte[] block = dataset(100 * 1024, 0, 255);
     createFile(getFileSystem(), testSeekFile, false, block);
     instream = getFileSystem().open(testSeekFile);
     assertEquals(0, instream.getPos());
@@ -291,6 +291,15 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     assertEquals("@8191", block[8191], (byte) instream.read());
     instream.seek(0);
     assertEquals("@0", 0, (byte) instream.read());
+
+    // try read & readFully
+    instream.seek(0);
+    assertEquals(0, instream.getPos());
+    instream.read();
+    assertEquals(1, instream.getPos());
+    byte[] buf = new byte[80 * 1024];
+    instream.readFully(1, buf, 0, buf.length);
+    assertEquals(1, instream.getPos());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
index 4abb550..3f6f347 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.classification.InterfaceStability;
 
 /**
  * All the constants used with the {@link S3AFileSystem}.
+ *
+ * Some of the strings are marked as {@code Unstable}. This means
+ * that they may be unsupported in future; at which point they will be marked
+ * as deprecated and simply ignored.
  */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
@@ -154,4 +158,36 @@ public final class Constants {
   /** read ahead buffer size to prevent connection re-establishments. */
   public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
   public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
+
+  /**
+   * Which input strategy to use for buffering, seeking and similar when
+   * reading data.
+   * Value: {@value}
+   */
+  @InterfaceStability.Unstable
+  public static final String INPUT_FADVISE =
+      "fs.s3a.experimental.input.fadvise";
+
+  /**
+   * General input. Some seeks, some reads.
+   * Value: {@value}
+   */
+  @InterfaceStability.Unstable
+  public static final String INPUT_FADV_NORMAL = "normal";
+
+  /**
+   * Optimized for sequential access.
+   * Value: {@value}
+   */
+  @InterfaceStability.Unstable
+  public static final String INPUT_FADV_SEQUENTIAL = "sequential";
+
+  /**
+   * Optimized purely for random seek+read/positionedRead operations;
+   * The performance of sequential IO may be reduced in exchange for
+   * more efficient {@code seek()} operations.
+   * Value: {@value}
+   */
+  @InterfaceStability.Unstable
+  public static final String INPUT_FADV_RANDOM = "random";
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
index 63bfb4f..513b57c 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
@@ -29,6 +29,7 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
@@ -82,7 +83,6 @@ import org.apache.hadoop.fs.s3native.S3xLoginHelper;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.VersionInfo;
 
-import static org.apache.commons.lang.StringUtils.*;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
@@ -126,6 +126,7 @@ public class S3AFileSystem extends FileSystem {
   private S3AInstrumentation instrumentation;
   private S3AStorageStatistics storageStatistics;
   private long readAhead;
+  private S3AInputPolicy inputPolicy;
 
   // The maximum number of entries that can be deleted in any call to s3
   private static final int MAX_ENTRIES_TO_DELETE = 1000;
@@ -227,6 +228,8 @@ public class S3AFileSystem extends FileSystem {
 
       serverSideEncryptionAlgorithm =
           conf.getTrimmed(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+      inputPolicy = S3AInputPolicy.getPolicy(
+          conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL));
     } catch (AmazonClientException e) {
       throw translateException("initializing ", new Path(name), e);
     }
@@ -482,6 +485,26 @@ public class S3AFileSystem extends FileSystem {
     return s3;
   }
 
+  /**
+   * Get the input policy for this FS instance.
+   * @return the input policy
+   */
+  @InterfaceStability.Unstable
+  public S3AInputPolicy getInputPolicy() {
+    return inputPolicy;
+  }
+
+  /**
+   * Change the input policy for this FS.
+   * @param inputPolicy new policy
+   */
+  @InterfaceStability.Unstable
+  public void setInputPolicy(S3AInputPolicy inputPolicy) {
+    Objects.requireNonNull(inputPolicy, "Null inputStrategy");
+    LOG.debug("Setting input strategy: {}", inputPolicy);
+    this.inputPolicy = inputPolicy;
+  }
+
   public S3AFileSystem() {
     super();
   }
@@ -537,7 +560,8 @@ public class S3AFileSystem extends FileSystem {
     }
 
     return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
-      fileStatus.getLen(), s3, statistics, instrumentation, readAhead));
+      fileStatus.getLen(), s3, statistics, instrumentation, readAhead,
+        inputPolicy));
   }
 
   /**
@@ -1745,6 +1769,7 @@ public class S3AFileSystem extends FileSystem {
         "S3AFileSystem{");
     sb.append("uri=").append(uri);
     sb.append(", workingDir=").append(workingDir);
+    sb.append(", inputPolicy=").append(inputPolicy);
     sb.append(", partSize=").append(partSize);
     sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
     sb.append(", maxKeys=").append(maxKeys);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java
new file mode 100644
index 0000000..c018410
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputPolicy.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+
+import static org.apache.hadoop.fs.s3a.Constants.*;
+
+/**
+ * Filesystem input policy.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public enum S3AInputPolicy {
+
+  Normal(INPUT_FADV_NORMAL),
+  Sequential(INPUT_FADV_SEQUENTIAL),
+  Random(INPUT_FADV_RANDOM);
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(S3AInputPolicy.class);
+  private final String policy;
+
+  S3AInputPolicy(String policy) {
+    this.policy = policy;
+  }
+
+  @Override
+  public String toString() {
+    return policy;
+  }
+
+  /**
+   * Choose an FS access policy.
+   * Always returns something,
+   * primarily by downgrading to "normal" if there is no other match.
+   * @param name strategy name from a configuration option, etc.
+   * @return the chosen strategy
+   */
+  public static S3AInputPolicy getPolicy(String name) {
+    String trimmed = name.trim().toLowerCase(Locale.ENGLISH);
+    switch (trimmed) {
+    case INPUT_FADV_NORMAL:
+      return Normal;
+    case INPUT_FADV_RANDOM:
+      return Random;
+    case INPUT_FADV_SEQUENTIAL:
+      return Sequential;
+    default:
+      LOG.warn("Unrecognized " + INPUT_FADVISE + " value: \"{}\"", trimmed);
+      return Normal;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index 7b5b7b3..ccb9726 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -77,9 +77,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   private final long contentLength;
   private final String uri;
   public static final Logger LOG = S3AFileSystem.LOG;
-  public static final long CLOSE_THRESHOLD = 4096;
   private final S3AInstrumentation.InputStreamStatistics streamStatistics;
-  private long readahead;
+  private final S3AInputPolicy inputPolicy;
+  private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
 
   /**
    * This is the actual position within the object, used by
@@ -87,8 +87,16 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    */
   private long nextReadPos;
 
-  /* Amount of data desired from the request */
-  private long requestedStreamLen;
+  /**
+   * The end of the content range of the last request.
+   * This is an absolute value of the range, not a length field.
+   */
+  private long contentRangeFinish;
+
+  /**
+   * The start of the content range of the last request.
+   */
+  private long contentRangeStart;
 
   public S3AInputStream(String bucket,
       String key,
@@ -96,7 +104,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       AmazonS3Client client,
       FileSystem.Statistics stats,
       S3AInstrumentation instrumentation,
-      long readahead) {
+      long readahead,
+      S3AInputPolicy inputPolicy) {
     Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket");
     Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key");
     Preconditions.checkArgument(contentLength >= 0 , "Negative content length");
@@ -107,6 +116,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
     this.stats = stats;
     this.uri = "s3a://" + this.bucket + "/" + this.key;
     this.streamStatistics = instrumentation.newInputStreamStatistics();
+    this.inputPolicy = inputPolicy;
     setReadahead(readahead);
   }
 
@@ -120,21 +130,23 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    */
   private synchronized void reopen(String reason, long targetPos, long length)
       throws IOException {
-    requestedStreamLen = this.contentLength;
 
     if (wrappedStream != null) {
-      closeStream("reopen(" + reason + ")", requestedStreamLen);
+      closeStream("reopen(" + reason + ")", contentRangeFinish);
     }
-    LOG.debug("reopen({}) for {} at targetPos={}, length={}," +
-        " requestedStreamLen={}, streamPosition={}, nextReadPosition={}",
-        uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos);
+
+    contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
+        length, contentLength, readahead);
+    LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
+        " streamPosition={}, nextReadPosition={}",
+        uri, reason, targetPos, contentRangeFinish, length,  pos, nextReadPos);
 
     streamStatistics.streamOpened();
     try {
       GetObjectRequest request = new GetObjectRequest(bucket, key)
-          .withRange(targetPos, requestedStreamLen);
+          .withRange(targetPos, contentRangeFinish);
       wrappedStream = client.getObject(request).getObjectContent();
-
+      contentRangeStart = targetPos;
       if (wrappedStream == null) {
         throw new IOException("Null IO stream from reopen of (" + reason +  ") "
             + uri);
@@ -205,8 +217,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       long forwardSeekRange = Math.max(readahead, available);
       // work out how much is actually left in the stream
       // then choose whichever comes first: the range or the EOF
-      long forwardSeekLimit = Math.min(remaining(), forwardSeekRange);
-      if (diff <= forwardSeekLimit) {
+      long remainingInCurrentRequest = remainingInCurrentRequest();
+
+      long forwardSeekLimit = Math.min(remainingInCurrentRequest,
+          forwardSeekRange);
+      boolean skipForward = remainingInCurrentRequest > 0
+          && diff <= forwardSeekLimit;
+      if (skipForward) {
         // the forward seek range is within the limits
         LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
         streamStatistics.seekForwards(diff);
@@ -231,14 +248,16 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       streamStatistics.seekBackwards(diff);
     } else {
       // targetPos == pos
-      // this should never happen as the caller filters it out.
-      // Retained just in case
-      LOG.debug("Ignoring seek {} to {} as target position == current",
-          uri, targetPos);
+      if (remainingInCurrentRequest() > 0) {
+        // if there is data left in the stream, keep going
+        return;
+      }
+
     }
 
+    // if the code reaches here, the stream needs to be reopened.
     // close the stream; if read the object will be opened at the new pos
-    closeStream("seekInStream()", this.requestedStreamLen);
+    closeStream("seekInStream()", this.contentRangeFinish);
     pos = targetPos;
   }
 
@@ -255,9 +274,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    */
   private void lazySeek(long targetPos, long len) throws IOException {
     //For lazy seek
-    if (targetPos != this.pos) {
-      seekInStream(targetPos, len);
-    }
+    seekInStream(targetPos, len);
 
     //re-open at specific location if needed
     if (wrappedStream == null) {
@@ -284,7 +301,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       return -1;
     }
 
-
     int byteRead;
     try {
       lazySeek(nextReadPos, 1);
@@ -328,7 +344,6 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * This updates the statistics on read operations started and whether
    * or not the read operation "completed", that is: returned the exact
    * number of bytes requested.
-   * @throws EOFException if there is no more data
    * @throws IOException if there are other problems
    */
   @Override
@@ -357,7 +372,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       streamStatistics.readOperationStarted(nextReadPos, len);
       bytesRead = wrappedStream.read(buf, off, len);
     } catch (EOFException e) {
-      throw e;
+      onReadFailure(e, len);
+      // the base implementation swallows EOFs.
+      return -1;
     } catch (IOException e) {
       onReadFailure(e, len);
       bytesRead = wrappedStream.read(buf, off, len);
@@ -397,7 +414,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       closed = true;
       try {
         // close or abort the stream
-        closeStream("close() operation", this.contentLength);
+        closeStream("close() operation", this.contentRangeFinish);
         // this is actually a no-op
         super.close();
       } finally {
@@ -420,13 +437,17 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    */
   private void closeStream(String reason, long length) {
     if (wrappedStream != null) {
-      boolean shouldAbort = length - pos > CLOSE_THRESHOLD;
+
+      // if the amount of data remaining in the current request is greater
+      // than the readahead value: abort.
+      long remaining = remainingInCurrentRequest();
+      boolean shouldAbort = remaining > readahead;
       if (!shouldAbort) {
         try {
           // clean close. This will read to the end of the stream,
           // so, while cleaner, can be pathological on a multi-GB object
           wrappedStream.close();
-          streamStatistics.streamClose(false);
+          streamStatistics.streamClose(false, remaining);
         } catch (IOException e) {
           // exception escalates to an abort
           LOG.debug("When closing {} stream for {}", uri, reason, e);
@@ -437,11 +458,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
         // Abort, rather than just close, the underlying stream.  Otherwise, the
         // remaining object payload is read from S3 while closing the stream.
         wrappedStream.abort();
-        streamStatistics.streamClose(true);
+        streamStatistics.streamClose(true, remaining);
       }
       LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," +
-          " length={}",
-          uri, (shouldAbort ? "aborted":"closed"), reason, pos, nextReadPos,
+          " request range {}-{} length={}",
+          uri, (shouldAbort ? "aborted" : "closed"), reason,
+          pos, nextReadPos,
+          contentRangeStart, contentRangeFinish,
           length);
       wrappedStream = null;
     }
@@ -451,7 +474,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   public synchronized int available() throws IOException {
     checkNotClosed();
 
-    long remaining = remaining();
+    long remaining = remainingInFile();
     if (remaining > Integer.MAX_VALUE) {
       return Integer.MAX_VALUE;
     }
@@ -462,10 +485,35 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * Bytes left in stream.
    * @return how many bytes are left to read
    */
-  protected long remaining() {
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public synchronized long remainingInFile() {
     return this.contentLength - this.pos;
   }
 
+  /**
+   * Bytes left in the current request.
+   * Only valid if there is an active request.
+   * @return how many bytes are left to read in the current GET.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public synchronized long remainingInCurrentRequest() {
+    return this.contentRangeFinish - this.pos;
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public synchronized long getContentRangeFinish() {
+    return contentRangeFinish;
+  }
+
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public synchronized long getContentRangeStart() {
+    return contentRangeStart;
+  }
+
   @Override
   public boolean markSupported() {
     return false;
@@ -480,15 +528,25 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   @Override
   @InterfaceStability.Unstable
   public String toString() {
-    final StringBuilder sb = new StringBuilder(
-        "S3AInputStream{");
-    sb.append(uri);
-    sb.append(" pos=").append(pos);
-    sb.append(" nextReadPos=").append(nextReadPos);
-    sb.append(" contentLength=").append(contentLength);
-    sb.append(" ").append(streamStatistics.toString());
-    sb.append('}');
-    return sb.toString();
+    String s = streamStatistics.toString();
+    synchronized (this) {
+      final StringBuilder sb = new StringBuilder(
+          "S3AInputStream{");
+      sb.append(uri);
+      sb.append(" wrappedStream=")
+          .append(wrappedStream != null ? "open" : "closed");
+      sb.append(" read policy=").append(inputPolicy);
+      sb.append(" pos=").append(pos);
+      sb.append(" nextReadPos=").append(nextReadPos);
+      sb.append(" contentLength=").append(contentLength);
+      sb.append(" contentRangeStart=").append(contentRangeStart);
+      sb.append(" contentRangeFinish=").append(contentRangeFinish);
+      sb.append(" remainingInCurrentRequest=")
+          .append(remainingInCurrentRequest());
+      sb.append('\n').append(s);
+      sb.append('}');
+      return sb.toString();
+    }
   }
 
   /**
@@ -542,7 +600,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   }
 
   @Override
-  public void setReadahead(Long readahead) {
+  public synchronized void setReadahead(Long readahead) {
     if (readahead == null) {
       this.readahead = Constants.DEFAULT_READAHEAD_RANGE;
     } else {
@@ -555,7 +613,48 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
    * Get the current readahead value.
    * @return a non-negative readahead value
    */
-  public long getReadahead() {
+  public synchronized long getReadahead() {
     return readahead;
   }
+
+  /**
+   * Calculate the limit for a get request, based on input policy
+   * and state of object.
+   * @param inputPolicy input policy
+   * @param targetPos position of the read
+   * @param length length of bytes requested; if less than zero "unknown"
+   * @param contentLength total length of file
+   * @param readahead current readahead value
+   * @return the absolute value of the limit of the request.
+   */
+  static long calculateRequestLimit(
+      S3AInputPolicy inputPolicy,
+      long targetPos,
+      long length,
+      long contentLength,
+      long readahead) {
+    long rangeLimit;
+    switch (inputPolicy) {
+    case Random:
+      // positioned.
+      // read either this block, or the here + readahead value.
+      rangeLimit = (length < 0) ? contentLength
+          : targetPos + Math.max(readahead, length);
+      break;
+
+    case Sequential:
+      // sequential: plan for reading the entire object.
+      rangeLimit = contentLength;
+      break;
+
+    case Normal:
+    default:
+      rangeLimit = contentLength;
+
+    }
+    // cannot read past the end of the object
+    rangeLimit = Math.min(contentLength, rangeLimit);
+    return rangeLimit;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
index 8892f0e..514b974 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
@@ -67,6 +67,8 @@ public class S3AInstrumentation {
   private final MutableCounterLong streamReadOperations;
   private final MutableCounterLong streamReadFullyOperations;
   private final MutableCounterLong streamReadsIncomplete;
+  private final MutableCounterLong streamBytesReadInClose;
+  private final MutableCounterLong streamBytesDiscardedInAbort;
   private final MutableCounterLong ignoredErrors;
 
   private final MutableCounterLong numberOfFilesCreated;
@@ -75,7 +77,8 @@ public class S3AInstrumentation {
   private final MutableCounterLong numberOfFilesDeleted;
   private final MutableCounterLong numberOfDirectoriesCreated;
   private final MutableCounterLong numberOfDirectoriesDeleted;
-  private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>();
+  private final Map<String, MutableCounterLong> streamMetrics =
+      new HashMap<>(30);
 
   private static final Statistic[] COUNTERS_TO_CREATE = {
       INVOCATION_COPY_FROM_LOCAL_FILE,
@@ -125,6 +128,8 @@ public class S3AInstrumentation {
         streamCounter(STREAM_READ_FULLY_OPERATIONS);
     streamReadsIncomplete =
         streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
+    streamBytesReadInClose = streamCounter(STREAM_CLOSE_BYTES_READ);
+    streamBytesDiscardedInAbort = streamCounter(STREAM_ABORT_BYTES_DISCARDED);
     numberOfFilesCreated = counter(FILES_CREATED);
     numberOfFilesCopied = counter(FILES_COPIED);
     bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
@@ -362,6 +367,8 @@ public class S3AInstrumentation {
     streamReadOperations.incr(statistics.readOperations);
     streamReadFullyOperations.incr(statistics.readFullyOperations);
     streamReadsIncomplete.incr(statistics.readsIncomplete);
+    streamBytesReadInClose.incr(statistics.bytesReadInClose);
+    streamBytesDiscardedInAbort.incr(statistics.bytesDiscardedInAbort);
   }
 
   /**
@@ -386,6 +393,8 @@ public class S3AInstrumentation {
     public long readOperations;
     public long readFullyOperations;
     public long readsIncomplete;
+    public long bytesReadInClose;
+    public long bytesDiscardedInAbort;
 
     private InputStreamStatistics() {
     }
@@ -426,13 +435,18 @@ public class S3AInstrumentation {
      * The inner stream was closed.
      * @param abortedConnection flag to indicate the stream was aborted,
      * rather than closed cleanly
+     * @param remainingInCurrentRequest the number of bytes remaining in
+     * the current request.
      */
-    public void streamClose(boolean abortedConnection) {
+    public void streamClose(boolean abortedConnection,
+        long remainingInCurrentRequest) {
       closeOperations++;
       if (abortedConnection) {
         this.aborted++;
+        bytesDiscardedInAbort += remainingInCurrentRequest;
       } else {
         closed++;
+        bytesReadInClose += remainingInCurrentRequest;
       }
     }
 
@@ -522,6 +536,8 @@ public class S3AInstrumentation {
       sb.append(", ReadOperations=").append(readOperations);
       sb.append(", ReadFullyOperations=").append(readFullyOperations);
       sb.append(", ReadsIncomplete=").append(readsIncomplete);
+      sb.append(", BytesReadInClose=").append(bytesReadInClose);
+      sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
       sb.append('}');
       return sb.toString();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
index d29cb2f..8121b3b 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
@@ -71,33 +71,37 @@ public enum Statistic {
   OBJECT_PUT_REQUESTS("object_put_requests",
       "Object put/multipart upload count"),
   OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"),
-  STREAM_ABORTED("streamAborted",
+  STREAM_ABORTED("stream_aborted",
       "Count of times the TCP stream was aborted"),
-  STREAM_BACKWARD_SEEK_OPERATIONS("streamBackwardSeekOperations",
+  STREAM_BACKWARD_SEEK_OPERATIONS("stream_backward_seek_pperations",
       "Number of executed seek operations which went backwards in a stream"),
   STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"),
-  STREAM_CLOSE_OPERATIONS("streamCloseOperations",
+  STREAM_CLOSE_OPERATIONS("stream_close_operations",
       "Total count of times an attempt to close a data stream was made"),
-  STREAM_FORWARD_SEEK_OPERATIONS("streamForwardSeekOperations",
+  STREAM_FORWARD_SEEK_OPERATIONS("stream_forward_seek_operations",
       "Number of executed seek operations which went forward in a stream"),
   STREAM_OPENED("streamOpened",
       "Total count of times an input stream to object store was opened"),
-  STREAM_READ_EXCEPTIONS("streamReadExceptions",
+  STREAM_READ_EXCEPTIONS("stream_read_exceptions",
       "Number of seek operations invoked on input streams"),
-  STREAM_READ_FULLY_OPERATIONS("streamReadFullyOperations",
-      "count of readFully() operations in streams"),
-  STREAM_READ_OPERATIONS("streamReadOperations",
+  STREAM_READ_FULLY_OPERATIONS("stream_read_fully_operations",
+      "Count of readFully() operations in streams"),
+  STREAM_READ_OPERATIONS("stream_read_operations",
       "Count of read() operations in streams"),
-  STREAM_READ_OPERATIONS_INCOMPLETE("streamReadOperationsIncomplete",
+  STREAM_READ_OPERATIONS_INCOMPLETE("stream_read_operations_incomplete",
       "Count of incomplete read() operations in streams"),
-  STREAM_SEEK_BYTES_BACKWARDS("streamBytesBackwardsOnSeek",
+  STREAM_SEEK_BYTES_BACKWARDS("stream_bytes_backwards_on_seek",
       "Count of bytes moved backwards during seek operations"),
-  STREAM_SEEK_BYTES_READ("streamBytesRead",
+  STREAM_SEEK_BYTES_READ("stream_bytes_read",
       "Count of bytes read during seek() in stream operations"),
-  STREAM_SEEK_BYTES_SKIPPED("streamBytesSkippedOnSeek",
+  STREAM_SEEK_BYTES_SKIPPED("stream_bytes_skipped_on_seek",
       "Count of bytes skipped during forward seek operation"),
-  STREAM_SEEK_OPERATIONS("streamSeekOperations",
-      "Number of read exceptions caught and attempted to recovered from");
+  STREAM_SEEK_OPERATIONS("stream_seek_operations",
+      "Number of seek operations during stream IO."),
+  STREAM_CLOSE_BYTES_READ("stream_bytes_read_in_close",
+      "Count of bytes read when closing streams during seek operations."),
+  STREAM_ABORT_BYTES_DISCARDED("stream_bytes_discarded_in_abort",
+      "Count of bytes discarded by aborting the stream");
 
   Statistic(String symbol, String description) {
     this.symbol = symbol;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
index d677baa..a2bc2a9 100644
--- a/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
+++ b/hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
@@ -657,6 +657,78 @@ the available memory. These settings should be tuned to the envisioned
 workflow (some large files, many small ones, ...) and the physical
 limitations of the machine and cluster (memory, network bandwidth).
 
+### S3A Experimental "fadvise" input policy support
+
+**Warning: EXPERIMENTAL: behavior may change in future**
+
+The S3A Filesystem client supports the notion of input policies, similar
+to that of the Posix `fadvise()` API call. This tunes the behavior of the S3A
+client to optimise HTTP GET requests for the different use cases.
+
+#### "sequential" (default)
+
+Read through the file, possibly with some short forward seeks.
+
+The whole document is requested in a single HTTP request; forward seeks
+within the readahead range are supported by skipping over the intermediate
+data.
+
+This is leads to maximum read throughput —but with very expensive
+backward seeks.
+
+
+#### "normal"
+
+This is currently the same as "sequential".
+
+#### "random"
+
+Optimised for random IO, specifically the Hadoop `PositionedReadable`
+operations —though `seek(offset); read(byte_buffer)` also benefits.
+
+Rather than ask for the whole file, the range of the HTTP request is
+set to that that of the length of data desired in the `read` operation
+(Rounded up to the readahead value set in `setReadahead()` if necessary).
+
+By reducing the cost of closing existing HTTP requests, this is
+highly efficient for file IO accessing a binary file
+through a series of `PositionedReadable.read()` and `PositionedReadable.readFully()`
+calls. Sequential reading of a file is expensive, as now many HTTP requests must
+be made to read through the file.
+
+For operations simply reading through a file: copying, distCp, reading
+Gzipped or other compressed formats, parsing .csv files, etc, the `sequential`
+policy is appropriate. This is the default: S3A does not need to be configured.
+
+For the specific case of high-performance random access IO, the `random` policy
+may be considered. The requirements are:
+
+* Data is read using the `PositionedReadable` API.
+* Long distance (many MB) forward seeks
+* Backward seeks as likely as forward seeks.
+* Little or no use of single character `read()` calls or small `read(buffer)`
+calls.
+* Applications running close to the S3 data store. That is: in EC2 VMs in
+the same datacenter as the S3 instance.
+
+The desired fadvise policy must be set in the configuration option
+`fs.s3a.experimental.input.fadvise` when the filesystem instance is created.
+That is: it can only be set on a per-filesystem basis, not on a per-file-read
+basis.
+
+    <property>
+      <name>fs.s3a.experimental.input.fadvise</name>
+      <value>random</value>
+      <description>Policy for reading files.
+       Values: 'random', 'sequential' or 'normal'
+       </description>
+    </property>
+
+[HDFS-2744](https://issues.apache.org/jira/browse/HDFS-2744),
+*Extend FSDataInputStream to allow fadvise* proposes adding a public API
+to set fadvise policies on input streams. Once implemented,
+this will become the supported mechanism used for configuring the input IO policy.
+
 ## Troubleshooting S3A
 
 Common problems working with S3A are
@@ -832,6 +904,10 @@ a failure should not lose data —it may result in duplicate datasets.
 * Because the write only begins on a `close()` operation, it may be in the final
 phase of a process where the write starts —this can take so long that some things
 can actually time out.
+* File IO performing many seek calls/positioned read calls will encounter
+performance problems due to the size of the HTTP requests made. On S3a,
+the (experimental) fadvise policy "random" can be set to alleviate this at the
+expense of sequential read performance and bandwidth.
 
 The slow performance of `rename()` surfaces during the commit phase of work,
 including

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java
new file mode 100644
index 0000000..c0c8137
--- /dev/null
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputPolicies.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * Unit test of the input policy logic, without making any S3 calls.
+ */
+@RunWith(Parameterized.class)
+public class TestS3AInputPolicies {
+
+  private S3AInputPolicy policy;
+  private long targetPos;
+  private long length;
+  private long contentLength;
+  private long readahead;
+  private long expectedLimit;
+
+  public static final long _64K = 64 * 1024;
+  public static final long _128K = 128 * 1024;
+  public static final long _256K = 256 * 1024;
+  public static final long _1MB = 1024L * 1024;
+  public static final long _10MB = _1MB * 10;
+
+  public TestS3AInputPolicies(S3AInputPolicy policy,
+      long targetPos,
+      long length,
+      long contentLength,
+      long readahead,
+      long expectedLimit) {
+    this.policy = policy;
+    this.targetPos = targetPos;
+    this.length = length;
+    this.contentLength = contentLength;
+    this.readahead = readahead;
+    this.expectedLimit = expectedLimit;
+  }
+
+  @Parameterized.Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][]{
+        {S3AInputPolicy.Normal, 0, -1, 0, _64K, 0},
+        {S3AInputPolicy.Normal, 0, -1, _10MB, _64K, _10MB},
+        {S3AInputPolicy.Normal, _64K, _64K, _10MB, _64K, _10MB},
+        {S3AInputPolicy.Sequential, 0, -1, 0, _64K, 0},
+        {S3AInputPolicy.Sequential, 0, -1, _10MB, _64K, _10MB},
+        {S3AInputPolicy.Random, 0, -1, 0, _64K, 0},
+        {S3AInputPolicy.Random, 0, -1, _10MB, _64K, _10MB},
+        {S3AInputPolicy.Random, 0, _128K, _10MB, _64K, _128K},
+        {S3AInputPolicy.Random, 0, _128K, _10MB, _256K, _256K},
+        {S3AInputPolicy.Random, 0, 0, _10MB, _256K, _256K},
+        {S3AInputPolicy.Random, 0, 1, _10MB, _256K, _256K},
+        {S3AInputPolicy.Random, 0, _1MB, _10MB, _256K, _1MB},
+        {S3AInputPolicy.Random, 0, _1MB, _10MB, 0, _1MB},
+        {S3AInputPolicy.Random, _10MB + _64K, _1MB, _10MB, _256K, _10MB},
+    });
+  }
+
+  @Test
+  public void testInputPolicies() throws Throwable {
+    Assert.assertEquals(
+        String.format("calculateRequestLimit(%s, %d, %d, %d, %d)",
+            policy, targetPos, length, contentLength, readahead),
+        expectedLimit,
+        S3AInputStream.calculateRequestLimit(policy, targetPos,
+            length, contentLength, readahead));
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4ee35436/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
index 5222a4e..752e374 100644
--- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
+++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
@@ -22,11 +22,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInputPolicy;
+import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.util.LineReader;
 import org.junit.After;
 import org.junit.Assume;
 import org.junit.Before;
@@ -34,6 +40,7 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.EOFException;
 import java.io.IOException;
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
@@ -53,13 +60,13 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
   public static final int BLOCK_SIZE = 32 * 1024;
   public static final int BIG_BLOCK_SIZE = 256 * 1024;
 
-  /** Tests only run if the there is a named test file that can be read */
+  /** Tests only run if the there is a named test file that can be read. */
   private boolean testDataAvailable = true;
   private String assumptionMessage = "test file";
 
   /**
    * Open the FS and the test data. The input stream is always set up here.
-   * @throws IOException
+   * @throws IOException IO Problems.
    */
   @Before
   public void openFS() throws IOException {
@@ -70,9 +77,10 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
       testDataAvailable = false;
     } else {
       testData = new Path(testFile);
-      s3aFS = (S3AFileSystem) FileSystem.newInstance(testData.toUri(), conf);
+      Path path = this.testData;
+      bindS3aFS(path);
       try {
-        testDataStatus = s3aFS.getFileStatus(testData);
+        testDataStatus = s3aFS.getFileStatus(this.testData);
       } catch (IOException e) {
         LOG.warn("Failed to read file {} specified in {}",
             testFile, KEY_CSVTEST_FILE, e);
@@ -81,98 +89,131 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
     }
   }
 
+  private void bindS3aFS(Path path) throws IOException {
+    s3aFS = (S3AFileSystem) FileSystem.newInstance(path.toUri(), getConf());
+  }
+
   /**
    * Cleanup: close the stream, close the FS.
    */
   @After
   public void cleanup() {
+    describe("cleanup");
     IOUtils.closeStream(in);
     IOUtils.closeStream(s3aFS);
   }
 
   /**
-   * Declare that the test requires the CSV test dataset
+   * Declare that the test requires the CSV test dataset.
    */
   private void requireCSVTestData() {
     Assume.assumeTrue(assumptionMessage, testDataAvailable);
   }
 
   /**
-   * Open the test file with the read buffer specified in the setting
-   * {@link #KEY_READ_BUFFER_SIZE}
+   * Open the test file with the read buffer specified in the setting.
+   * {@link #KEY_READ_BUFFER_SIZE}; use the {@code Normal} policy
    * @return the stream, wrapping an S3a one
-   * @throws IOException
+   * @throws IOException IO problems
    */
   FSDataInputStream openTestFile() throws IOException {
+    return openTestFile(S3AInputPolicy.Normal, 0);
+  }
+
+  /**
+   * Open the test file with the read buffer specified in the setting
+   * {@link #KEY_READ_BUFFER_SIZE}.
+   * This includes the {@link #requireCSVTestData()} assumption; so
+   * if called before any FS op, will automatically skip the test
+   * if the CSV file is absent.
+   *
+   * @param inputPolicy input policy to use
+   * @param readahead readahead/buffer size
+   * @return the stream, wrapping an S3a one
+   * @throws IOException IO problems
+   */
+  FSDataInputStream openTestFile(S3AInputPolicy inputPolicy, long readahead)
+      throws IOException {
+    requireCSVTestData();
+    return openDataFile(s3aFS, this.testData, inputPolicy, readahead);
+  }
+
+  /**
+   * Open a test file with the read buffer specified in the setting
+   * {@link #KEY_READ_BUFFER_SIZE}.
+   *
+   * @param path path to open
+   * @param inputPolicy input policy to use
+   * @param readahead readahead/buffer size
+   * @return the stream, wrapping an S3a one
+   * @throws IOException IO problems
+   */
+  private FSDataInputStream openDataFile(S3AFileSystem fs,
+      Path path,
+      S3AInputPolicy inputPolicy,
+      long readahead) throws IOException {
     int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
         DEFAULT_READ_BUFFER_SIZE);
-    FSDataInputStream stream = s3aFS.open(testData, bufferSize);
-    streamStatistics = getInputStreamStatistics(stream);
-    return stream;
+    S3AInputPolicy policy = fs.getInputPolicy();
+    fs.setInputPolicy(inputPolicy);
+    try {
+      FSDataInputStream stream = fs.open(path, bufferSize);
+      if (readahead >= 0) {
+        stream.setReadahead(readahead);
+      }
+      streamStatistics = getInputStreamStatistics(stream);
+      return stream;
+    } finally {
+      fs.setInputPolicy(policy);
+    }
   }
 
   /**
-   * assert tha the stream was only ever opened once
+   * Assert that the stream was only ever opened once.
    */
   protected void assertStreamOpenedExactlyOnce() {
     assertOpenOperationCount(1);
   }
 
   /**
-   * Make an assertion count about the number of open operations
+   * Make an assertion count about the number of open operations.
    * @param expected the expected number
    */
-  private void assertOpenOperationCount(int expected) {
-    assertEquals("open operations in " + streamStatistics,
+  private void assertOpenOperationCount(long expected) {
+    assertEquals("open operations in\n" + in,
         expected, streamStatistics.openOperations);
   }
 
   /**
    * Log how long an IOP took, by dividing the total time by the
-   * count of operations, printing in a human-readable form
+   * count of operations, printing in a human-readable form.
+   * @param operation operation being measured
    * @param timer timing data
    * @param count IOP count.
    */
-  protected void logTimePerIOP(NanoTimer timer, long count) {
-    LOG.info("Time per IOP: {} nS", toHuman(timer.duration() / count));
-  }
-
-  @Test
-  public void testTimeToOpenAndReadWholeFileByByte() throws Throwable {
-    requireCSVTestData();
-    describe("Open the test file %s and read it byte by byte", testData);
-    long len = testDataStatus.getLen();
-    NanoTimer timeOpen = new NanoTimer();
-    in = openTestFile();
-    timeOpen.end("Open stream");
-    NanoTimer readTimer = new NanoTimer();
-    long count = 0;
-    while (in.read() >= 0) {
-      count ++;
-    }
-    readTimer.end("Time to read %d bytes", len);
-    bandwidth(readTimer, count);
-    assertEquals("Not enough bytes were read)", len, count);
-    long nanosPerByte = readTimer.nanosPerOperation(count);
-    LOG.info("An open() call has the equivalent duration of reading {} bytes",
-        toHuman( timeOpen.duration() / nanosPerByte));
+  protected void logTimePerIOP(String operation,
+      NanoTimer timer,
+      long count) {
+    LOG.info("Time per {}: {} nS",
+        operation, toHuman(timer.duration() / count));
   }
 
   @Test
   public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
     requireCSVTestData();
+    int blockSize = _1MB;
     describe("Open the test file %s and read it in blocks of size %d",
-        testData, BLOCK_SIZE);
+        testData, blockSize);
     long len = testDataStatus.getLen();
     in = openTestFile();
-    byte[] block = new byte[BLOCK_SIZE];
+    byte[] block = new byte[blockSize];
     NanoTimer timer2 = new NanoTimer();
     long count = 0;
     // implicitly rounding down here
-    long blockCount = len / BLOCK_SIZE;
+    long blockCount = len / blockSize;
     for (long i = 0; i < blockCount; i++) {
       int offset = 0;
-      int remaining = BLOCK_SIZE;
+      int remaining = blockSize;
       NanoTimer blockTimer = new NanoTimer();
       int reads = 0;
       while (remaining > 0) {
@@ -189,15 +230,14 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
     }
     timer2.end("Time to read %d bytes in %d blocks", len, blockCount );
     bandwidth(timer2, count);
-    LOG.info("{}", streamStatistics);
+    logStreamStatistics();
   }
 
   @Test
   public void testLazySeekEnabled() throws Throwable {
-    requireCSVTestData();
     describe("Verify that seeks do not trigger any IO");
-    long len = testDataStatus.getLen();
     in = openTestFile();
+    long len = testDataStatus.getLen();
     NanoTimer timer = new NanoTimer();
     long blockCount = len / BLOCK_SIZE;
     for (long i = 0; i < blockCount; i++) {
@@ -206,24 +246,14 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
     in.seek(0);
     blockCount++;
     timer.end("Time to execute %d seeks", blockCount);
-    logTimePerIOP(timer, blockCount);
-    LOG.info("{}", streamStatistics);
+    logTimePerIOP("seek()", timer, blockCount);
+    logStreamStatistics();
     assertOpenOperationCount(0);
     assertEquals("bytes read", 0, streamStatistics.bytesRead);
   }
 
   @Test
-  public void testReadAheadDefault() throws Throwable {
-    requireCSVTestData();
-    describe("Verify that a series of forward skips within the readahead" +
-        " range do not close and reopen the stream");
-    executeSeekReadSequence(BLOCK_SIZE, Constants.DEFAULT_READAHEAD_RANGE);
-    assertStreamOpenedExactlyOnce();
-  }
-
-  @Test
   public void testReadaheadOutOfRange() throws Throwable {
-    requireCSVTestData();
     try {
       in = openTestFile();
       in.setReadahead(-1L);
@@ -231,39 +261,75 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
     } catch (IllegalArgumentException e) {
       // expected
     }
-
   }
 
   @Test
-  public void testReadBigBlocksAvailableReadahead() throws Throwable {
-    requireCSVTestData();
-    describe("set readahead to available bytes only");
-    executeSeekReadSequence(BIG_BLOCK_SIZE, 0);
-    // expect that the stream will have had lots of opens
-    assertTrue("not enough open operations in " + streamStatistics,
-        streamStatistics.openOperations > 1);
+  public void testReadWithNormalPolicy() throws Throwable {
+    describe("Read big blocks with a big readahead");
+    executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2,
+        S3AInputPolicy.Normal);
+    assertStreamOpenedExactlyOnce();
   }
 
   @Test
-  public void testReadBigBlocksBigReadahead() throws Throwable {
-    requireCSVTestData();
-    describe("Read big blocks with a big readahead");
-    executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2);
+  public void testDecompressionSequential128K() throws Throwable {
+    describe("Decompress with a 128K readahead");
+    executeDecompression(128 * 1024, S3AInputPolicy.Sequential);
     assertStreamOpenedExactlyOnce();
   }
 
   /**
-   * Execute a seek+read sequence
+   * Execute a decompression + line read with the given input policy.
+   * @param readahead byte readahead
+   * @param inputPolicy read policy
+   * @throws IOException IO Problems
+   */
+  private void executeDecompression(long readahead,
+      S3AInputPolicy inputPolicy) throws IOException {
+    CompressionCodecFactory factory
+        = new CompressionCodecFactory(getConf());
+    CompressionCodec codec = factory.getCodec(testData);
+    long bytesRead = 0;
+    int lines = 0;
+
+    FSDataInputStream objectIn = openTestFile(inputPolicy, readahead);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    try (LineReader lineReader = new LineReader(
+        codec.createInputStream(objectIn), getConf())) {
+      Text line = new Text();
+      int read;
+      while ((read = lineReader.readLine(line)) > 0) {
+        bytesRead += read;
+        lines++;
+      }
+    } catch (EOFException eof) {
+      // done
+    }
+    timer.end("Time to read %d lines [%d bytes expanded, %d raw]" +
+        " with readahead = %d",
+        lines,
+        bytesRead,
+        testDataStatus.getLen(),
+        readahead);
+    logTimePerIOP("line read", timer, lines);
+    logStreamStatistics();
+  }
+
+  private void logStreamStatistics() {
+    LOG.info(String.format("Stream Statistics%n{}"), streamStatistics);
+  }
+
+  /**
+   * Execute a seek+read sequence.
    * @param blockSize block size for seeks
    * @param readahead what the readahead value of the stream should be
    * @throws IOException IO problems
    */
   protected void executeSeekReadSequence(long blockSize,
-      long readahead) throws IOException {
-    requireCSVTestData();
+      long readahead,
+      S3AInputPolicy policy) throws IOException {
+    in = openTestFile(policy, readahead);
     long len = testDataStatus.getLen();
-    in = openTestFile();
-    in.setReadahead(readahead);
     NanoTimer timer = new NanoTimer();
     long blockCount = len / blockSize;
     LOG.info("Reading {} blocks, readahead = {}",
@@ -277,11 +343,187 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
         blockCount,
         blockSize,
         readahead);
-    logTimePerIOP(timer, blockCount);
+    logTimePerIOP("seek(pos + " + blockCount+"); read()", timer, blockCount);
     LOG.info("Effective bandwidth {} MB/S",
         timer.bandwidthDescription(streamStatistics.bytesRead -
             streamStatistics.bytesSkippedOnSeek));
-    LOG.info("{}", streamStatistics);
+    logStreamStatistics();
   }
 
+  public static final int _4K = 4 * 1024;
+  public static final int _8K = 8 * 1024;
+  public static final int _16K = 16 * 1024;
+  public static final int _32K = 32 * 1024;
+  public static final int _64K = 64 * 1024;
+  public static final int _128K = 128 * 1024;
+  public static final int _256K = 256 * 1024;
+  public static final int _1MB = 1024 * 1024;
+  public static final int _2MB = 2 * _1MB;
+  public static final int _10MB = _1MB * 10;
+  public static final int _5MB = _1MB * 5;
+
+  private static final int[][] RANDOM_IO_SEQUENCE = {
+      {_2MB, _128K},
+      {_128K, _128K},
+      {_5MB, _64K},
+      {_1MB, _1MB},
+  };
+
+  @Test
+  public void testRandomIORandomPolicy() throws Throwable {
+    executeRandomIO(S3AInputPolicy.Random, (long) RANDOM_IO_SEQUENCE.length);
+    assertEquals("streams aborted in " + streamStatistics,
+        0, streamStatistics.aborted);
+  }
+
+  @Test
+  public void testRandomIONormalPolicy() throws Throwable {
+    long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
+    executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
+    assertEquals("streams aborted in " + streamStatistics,
+        4, streamStatistics.aborted);
+  }
+
+  /**
+   * Execute the random IO {@code readFully(pos, bytes[])} sequence defined by
+   * {@link #RANDOM_IO_SEQUENCE}. The stream is closed afterwards; that's used
+   * in the timing too
+   * @param policy read policy
+   * @param expectedOpenCount expected number of stream openings
+   * @throws IOException IO problems
+   * @return the timer
+   */
+  private ContractTestUtils.NanoTimer executeRandomIO(S3AInputPolicy policy,
+      long expectedOpenCount)
+      throws IOException {
+    describe("Random IO with policy \"%s\"", policy);
+    byte[] buffer = new byte[_1MB];
+    long totalBytesRead = 0;
+
+    in = openTestFile(policy, 0);
+    ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
+    for (int[] action : RANDOM_IO_SEQUENCE) {
+      int position = action[0];
+      int range = action[1];
+      in.readFully(position, buffer, 0, range);
+      totalBytesRead += range;
+    }
+    int reads = RANDOM_IO_SEQUENCE.length;
+    timer.end("Time to execute %d reads of total size %d bytes",
+        reads,
+        totalBytesRead);
+    in.close();
+    assertOpenOperationCount(expectedOpenCount);
+    logTimePerIOP("byte read", timer, totalBytesRead);
+    LOG.info("Effective bandwidth {} MB/S",
+        timer.bandwidthDescription(streamStatistics.bytesRead -
+            streamStatistics.bytesSkippedOnSeek));
+    logStreamStatistics();
+    return timer;
+  }
+
+  S3AInputStream getS3aStream() {
+    return (S3AInputStream) in.getWrappedStream();
+  }
+
+  @Test
+  public void testRandomReadOverBuffer() throws Throwable {
+    describe("read over a buffer, making sure that the requests" +
+        " spans readahead ranges");
+    int datasetLen = _32K;
+    Path dataFile = new Path(getTestPath(), "testReadOverBuffer.bin");
+    byte[] sourceData = dataset(datasetLen, 0, 64);
+    // relies on the field 'fs' referring to the R/W FS
+    writeDataset(fs, dataFile, sourceData, datasetLen, _16K, true);
+    byte[] buffer = new byte[datasetLen];
+    int readahead = _8K;
+    int halfReadahead = _4K;
+    in = openDataFile(fs, dataFile, S3AInputPolicy.Random, readahead);
+
+    LOG.info("Starting initial reads");
+    S3AInputStream s3aStream = getS3aStream();
+    assertEquals(readahead, s3aStream.getReadahead());
+    byte[] oneByte = new byte[1];
+    assertEquals(1, in.read(0, oneByte, 0, 1));
+    // make some assertions about the current state
+    assertEquals("remaining in\n" + in,
+        readahead - 1, s3aStream.remainingInCurrentRequest());
+    assertEquals("range start in\n" + in,
+        0, s3aStream.getContentRangeStart());
+    assertEquals("range finish in\n" + in,
+        readahead, s3aStream.getContentRangeFinish());
+
+    assertStreamOpenedExactlyOnce();
+
+    describe("Starting sequence of positioned read calls over\n%s", in);
+    NanoTimer readTimer = new NanoTimer();
+    int currentPos = halfReadahead;
+    int offset = currentPos;
+    int bytesRead = 0;
+    int readOps = 0;
+
+    // make multiple read() calls
+    while (bytesRead < halfReadahead) {
+      int length = buffer.length - offset;
+      int read = in.read(currentPos, buffer, offset, length);
+      bytesRead += read;
+      offset += read;
+      readOps++;
+      assertEquals("open operations on request #" + readOps
+              + " after reading " + bytesRead
+              + " current position in stream " + currentPos
+              + " in\n" + fs
+              + "\n " + in,
+          1, streamStatistics.openOperations);
+      for (int i = currentPos; i < currentPos + read; i++) {
+        assertEquals("Wrong value from byte " + i,
+            sourceData[i], buffer[i]);
+      }
+      currentPos += read;
+    }
+    assertStreamOpenedExactlyOnce();
+    // assert at the end of the original block
+    assertEquals(readahead, currentPos);
+    readTimer.end("read %d in %d operations", bytesRead, readOps);
+    bandwidth(readTimer, bytesRead);
+    LOG.info("Time per byte(): {} nS",
+        toHuman(readTimer.nanosPerOperation(bytesRead)));
+    LOG.info("Time per read(): {} nS",
+        toHuman(readTimer.nanosPerOperation(readOps)));
+
+    describe("read last byte");
+    // read one more
+    int read = in.read(currentPos, buffer, bytesRead, 1);
+    assertTrue("-1 from last read", read >= 0);
+    assertOpenOperationCount(2);
+    assertEquals("Wrong value from read ", sourceData[currentPos],
+        (int) buffer[currentPos]);
+    currentPos++;
+
+
+    // now scan all the way to the end of the file, using single byte read()
+    // calls
+    describe("read() to EOF over \n%s", in);
+    long readCount = 0;
+    NanoTimer timer = new NanoTimer();
+    LOG.info("seeking");
+    in.seek(currentPos);
+    LOG.info("reading");
+    while(currentPos < datasetLen) {
+      int r = in.read();
+      assertTrue("Negative read() at position " + currentPos + " in\n" + in,
+          r >= 0);
+      buffer[currentPos] = (byte)r;
+      assertEquals("Wrong value from read from\n" + in,
+          sourceData[currentPos], r);
+      currentPos++;
+      readCount++;
+    }
+    timer.end("read %d bytes", readCount);
+    bandwidth(timer, readCount);
+    LOG.info("Time per read(): {} nS",
+        toHuman(timer.nanosPerOperation(readCount)));
+
+    assertEquals("last read in " + in, -1, in.read());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message