hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ste...@apache.org
Subject [hadoop] 01/01: HADOOP-15870 patch 007
Date Thu, 28 Feb 2019 20:24:25 GMT
This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch s3/HADOOP-15920-HADOOP-15870-getNextPos
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 23a31be70a94a4a1e8ff47214726ae0d3c052884
Author: Steve Loughran <stevel@apache.org>
AuthorDate: Thu Feb 28 20:24:22 2019 +0000

    HADOOP-15870 patch 007
    
    Change-Id: I27620ddb3003ad54b3dfaa372f5dec598a7fbfb2
---
 .../site/markdown/filesystem/fsdatainputstream.md  | 53 +++++++++++++
 .../fs/contract/AbstractContractSeekTest.java      | 87 ++++++++++++++++++----
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   | 29 +++++---
 3 files changed, 146 insertions(+), 23 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
index e067b07..fe2471e 100644
--- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
+++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdatainputstream.md
@@ -119,6 +119,59 @@ Return the data at the current position.
     else
         result = -1
 
+### <a name="InputStream.available"></a> `InputStream.available()`
+
+Returns the number of bytes "estimated" to be readable on a stream before `read()`
+blocks on any IO (i.e. the thread is potentially suspended for some time).
+
+That is: for all values `v` returned by `available()`, `read(buffer, 0, v)`
+is should not block.
+
+#### Postconditions
+
+```python
+if len(data) == 0:
+  result = 0
+
+elif pos >= len(data):
+  result = 0
+
+else:
+  d = "the amount of data known to be already buffered/cached locally"
+  result = min(1, d)  # optional but recommended: see below.
+```
+
+As `0` is a number which is always meets this condition, it is nominally
+possible for an implementation to simply return `0`. However, this is not
+considered useful, and some applications/libraries expect a positive number.
+
+#### The GZip problem.
+
+[JDK-7036144](http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144),
+"GZIPInputStream readTrailer uses faulty available() test for end-of-stream"
+discusses how the JDK's GZip code it uses `available()` to detect an EOF,
+in a loop similar to the the following
+
+```java
+while(instream.available()) {
+  process(instream.read());
+}
+```
+
+The correct loop would have been:
+
+```java
+int r;
+while((r=instream.read()) >= 0) {
+  process(r);
+}
+```
+
+If `available()` ever returns 0, then the gzip loop halts prematurely.
+
+For this reason, implementations *should* return a value &gt;=1, even
+if it breaks that requirement of `available()` returning the amount guaranteed
+not to block on reads.
 
 ### <a name="InputStream.read.buffer[]"></a> `InputStream.read(buffer[], offset,
length)`
 
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
index 3c1377a..3d2e7e6 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractSeekTest.java
@@ -32,16 +32,13 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.Random;
 
-import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
-import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyRead;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
 
 /**
  * Test Seek operations
  */
-public abstract class AbstractContractSeekTest extends AbstractFSContractTestBase {
+public abstract class AbstractContractSeekTest
+        extends AbstractFSContractTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractContractSeekTest.class);
 
@@ -99,14 +96,18 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     describe("seek and read a 0 byte file");
     instream = getFileSystem().open(zeroByteFile);
     assertEquals(0, instream.getPos());
+    assertAvailableIsZero(instream);
     //expect initial read to fai;
     int result = instream.read();
     assertMinusOne("initial byte read", result);
+    assertAvailableIsZero(instream);
     byte[] buffer = new byte[1];
     //expect that seek to 0 works
     instream.seek(0);
+    assertAvailableIsZero(instream);
     //reread, expect same exception
     result = instream.read();
+    assertAvailableIsZero(instream);
     assertMinusOne("post-seek byte read", result);
     result = instream.read(buffer, 0, 1);
     assertMinusOne("post-seek buffer read", result);
@@ -133,7 +134,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
   public void testSeekReadClosedFile() throws Throwable {
     instream = getFileSystem().open(smallSeekFile);
     getLogger().debug(
-      "Stream is of type " + instream.getClass().getCanonicalName());
+            "Stream is of type " + instream.getClass().getCanonicalName());
     instream.close();
     try {
       instream.seek(0);
@@ -172,6 +173,22 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
       // sure there's no other exception like an NPE.
 
     }
+    // a closed stream should either fail or return 0 bytes.
+    try {
+      int a = instream.available();
+      LOG.info("available() returns a value on a closed file: {}", a);
+      assertAvailableIsZero(instream);
+    } catch (IOException | IllegalStateException expected) {
+      // expected
+    }
+    // a closed stream should either fail or return 0 bytes.
+    try {
+      int a = instream.available();
+      LOG.info("available() returns a value on a closed file: {}", a);
+      assertAvailableIsZero(instream);
+    } catch (IOException | IllegalStateException expected) {
+      // expected
+    }
     //and close again
     instream.close();
   }
@@ -185,8 +202,8 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
       long p = instream.getPos();
       LOG.warn("Seek to -1 returned a position of " + p);
       int result = instream.read();
-      fail(
-        "expected an exception, got data " + result + " at a position of " + p);
+      fail("expected an exception, got data " + result
+          + " at a position of " + p);
     } catch (EOFException e) {
       //bad seek -expected
       handleExpectedException(e);
@@ -205,6 +222,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     //expect that seek to 0 works
     instream.seek(0);
     int result = instream.read();
+    assertAvailableIsPositive(instream);
     assertEquals(0, result);
     assertEquals(1, instream.read());
     assertEquals(2, instream.getPos());
@@ -226,13 +244,24 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     //go just before the end
     instream.seek(TEST_FILE_LEN - 2);
     assertTrue("Premature EOF", instream.read() != -1);
+    assertAvailableIsPositive(instream);
     assertTrue("Premature EOF", instream.read() != -1);
+    checkAvailabilityAtEOF();
     assertMinusOne("read past end of file", instream.read());
   }
 
+  /**
+   * This can be overridden if a filesystem always returns 01
+   * @throws IOException
+   */
+  protected void checkAvailabilityAtEOF() throws IOException {
+    assertAvailableIsZero(instream);
+  }
+
   @Test
   public void testSeekPastEndOfFileThenReseekAndRead() throws Throwable {
-    describe("do a seek past the EOF, then verify the stream recovers");
+    describe("do a seek past the EOF, " +
+            "then verify the stream recovers");
     instream = getFileSystem().open(smallSeekFile);
     //go just before the end. This may or may not fail; it may be delayed until the
     //read
@@ -261,6 +290,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     //now go back and try to read from a valid point in the file
     instream.seek(1);
     assertTrue("Premature EOF", instream.read() != -1);
+    assertAvailableIsPositive(instream);
   }
 
   /**
@@ -278,6 +308,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     //expect that seek to 0 works
     instream.seek(0);
     int result = instream.read();
+    assertAvailableIsPositive(instream);
     assertEquals(0, result);
     assertEquals(1, instream.read());
     assertEquals(2, instream.read());
@@ -296,6 +327,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     instream.seek(0);
     assertEquals(0, instream.getPos());
     instream.read();
+    assertAvailableIsPositive(instream);
     assertEquals(1, instream.getPos());
     byte[] buf = new byte[80 * 1024];
     instream.readFully(1, buf, 0, buf.length);
@@ -305,7 +337,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
   @Test
   public void testPositionedBulkReadDoesntChangePosition() throws Throwable {
     describe(
-      "verify that a positioned read does not change the getPos() value");
+            "verify that a positioned read does not change the getPos() value");
     assumeSupportsPositionedReadable();
     Path testSeekFile = path("bigseekfile.txt");
     byte[] block = dataset(65536, 0, 255);
@@ -314,7 +346,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     instream.seek(39999);
     assertTrue(-1 != instream.read());
     assertEquals(40000, instream.getPos());
-
+    assertAvailableIsPositive(instream);
     int v = 256;
     byte[] readBuffer = new byte[v];
     assertEquals(v, instream.read(128, readBuffer, 0, v));
@@ -322,6 +354,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     assertEquals(40000, instream.getPos());
     //content is the same too
     assertEquals("@40000", block[40000], (byte) instream.read());
+    assertAvailableIsPositive(instream);
     //now verify the picked up data
     for (int i = 0; i < 256; i++) {
       assertEquals("@" + i, block[i + 128], readBuffer[i]);
@@ -376,6 +409,7 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     assertEquals(0, instream.getPos());
     byte[] buffer = new byte[1];
     instream.readFully(0, buffer, 0, 0);
+    assertAvailableIsZero(instream);
     assertEquals(0, instream.getPos());
     // seek to 0 read 0 bytes from it
     instream.seek(0);
@@ -424,7 +458,8 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
       fail("Expected an exception");
     } catch (EOFException e) {
       handleExpectedException(e);
-    } catch (IOException |IllegalArgumentException | IndexOutOfBoundsException e) {
+    } catch (IOException |IllegalArgumentException
+            | IndexOutOfBoundsException e) {
       handleRelaxedException("readFully with a negative position ",
           "EOFException",
           e);
@@ -551,7 +586,8 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
       fail("Expected an exception, got " + r);
     } catch (EOFException e) {
       handleExpectedException(e);
-    } catch (IOException | IllegalArgumentException | IndexOutOfBoundsException e) {
+    } catch (IOException | IllegalArgumentException
+            | IndexOutOfBoundsException e) {
       handleRelaxedException("read() with a negative position ",
           "EOFException",
           e);
@@ -587,6 +623,29 @@ public abstract class AbstractContractSeekTest extends AbstractFSContractTestBas
     instream = getFileSystem().open(smallSeekFile);
     instream.seek(TEST_FILE_LEN -1);
     assertTrue("read at last byte", instream.read() > 0);
+    assertAvailableIsZero(instream);
     assertEquals("read just past EOF", -1, instream.read());
   }
+
+  /**
+   * Assert that the number of bytes available is zero.
+   * @param in input stream
+   */
+  protected static void assertAvailableIsZero(FSDataInputStream in)
+      throws IOException {
+    assertEquals("stream.available() should be zero",
+        0, in.available());
+  }
+
+  /**
+   * Assert that the number of bytes available is greater than zero.
+   * @param in input stream
+   */
+  protected static void assertAvailableIsPositive(FSDataInputStream in)
+      throws IOException {
+    int available = in.available();
+    assertTrue("stream.available() should be positive but is "
+        + available,
+        available > 0);
+  }
 }
diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index ccc86d0..c7042b3 100644
--- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -197,7 +197,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead
{
   }
 
   @Override
-  public synchronized long getPos() throws IOException {
+  public synchronized long getPos() {
     return (nextReadPos < 0) ? 0 : nextReadPos;
   }
 
@@ -595,15 +595,26 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead
{
     return connectionOpen;
   }
 
+  /**
+   * Return the number of bytes available.
+   * If the inner stream is closed, the value is 1 for consistency
+   * with S3ObjectStream (and so address
+   * http://bugs.java.com/bugdatabase/view_bug.do?bug_id=7036144 ).
+   * If the stream is open, then it is the amount returned by the
+   * HTTP connection.
+   * @return a value greater than or equal to zero.
+   * @throws IOException IO failure.
+   */
   @Override
   public synchronized int available() throws IOException {
     checkNotClosed();
-
-    long remaining = remainingInFile();
-    if (remaining > Integer.MAX_VALUE) {
-      return Integer.MAX_VALUE;
+    if (contentLength == 0 || (nextReadPos >= contentLength)) {
+      return 0;
     }
-    return (int)remaining;
+
+    return wrappedStream == null
+        ? 1
+        : wrappedStream.available();
   }
 
   /**
@@ -612,8 +623,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead
{
    */
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
-  public synchronized long remainingInFile() {
-    return this.contentLength - this.pos;
+  public synchronized long remainingInFile() throws IOException {
+    return contentLength - getPos();
   }
 
   /**
@@ -624,7 +635,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead
{
   @InterfaceAudience.Private
   @InterfaceStability.Unstable
   public synchronized long remainingInCurrentRequest() {
-    return this.contentRangeFinish - this.pos;
+    return contentRangeFinish - getPos();
   }
 
   @InterfaceAudience.Private


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message