hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1331570 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/test/java/org/apache/hadoop/hdfs/
Date Fri, 27 Apr 2012 20:13:17 GMT
Author: szetszwo
Date: Fri Apr 27 20:13:17 2012
New Revision: 1331570

URL: http://svn.apache.org/viewvc?rev=1331570&view=rev
Log:
HDFS-3334. Fix ByteRangeInputStream stream leakage.  Contributed by Daryn Sharp

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1331570&r1=1331569&r2=1331570&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Apr 27 20:13:17 2012
@@ -917,6 +917,8 @@ Release 0.23.3 - UNRELEASED
 
     HDFS-3321. Fix safe mode turn off tip message.  (Ravi Prakash via szetszwo)
 
+    HDFS-3334. Fix ByteRangeInputStream stream leakage.  (Daryn Sharp via szetszwo)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1331570&r1=1331569&r2=1331570&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
Fri Apr 27 20:13:17 2012
@@ -27,6 +27,8 @@ import org.apache.commons.io.input.Bound
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * To support HTTP byte streams, a new connection to an HTTP server needs to be
  * created each time. This class hides the complexity of those multiple 
@@ -61,7 +63,7 @@ public abstract class ByteRangeInputStre
   }
 
   enum StreamStatus {
-    NORMAL, SEEK
+    NORMAL, SEEK, CLOSED
   }
   protected InputStream in;
   protected URLOpener originalURL;
@@ -89,40 +91,51 @@ public abstract class ByteRangeInputStre
   protected abstract URL getResolvedUrl(final HttpURLConnection connection
       ) throws IOException;
 
-  private InputStream getInputStream() throws IOException {
-    if (status != StreamStatus.NORMAL) {
-      
-      if (in != null) {
-        in.close();
-        in = null;
-      }
-      
-      // Use the original url if no resolved url exists, eg. if
-      // it's the first time a request is made.
-      final URLOpener opener =
-        (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
-
-      final HttpURLConnection connection = opener.openConnection(startPos);
-      connection.connect();
-      checkResponseCode(connection);
-
-      final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
-      if (cl == null) {
-        throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
-      }
-      final long streamlength = Long.parseLong(cl);
-      filelength = startPos + streamlength;
-      // Java has a bug with >2GB request streams.  It won't bounds check
-      // the reads so the transfer blocks until the server times out
-      in = new BoundedInputStream(connection.getInputStream(), streamlength);
-
-      resolvedURL.setURL(getResolvedUrl(connection));
-      status = StreamStatus.NORMAL;
+  @VisibleForTesting
+  protected InputStream getInputStream() throws IOException {
+    switch (status) {
+      case NORMAL:
+        break;
+      case SEEK:
+        if (in != null) {
+          in.close();
+        }
+        in = openInputStream();
+        status = StreamStatus.NORMAL;
+        break;
+      case CLOSED:
+        throw new IOException("Stream closed");
     }
-    
     return in;
   }
   
+  @VisibleForTesting
+  protected InputStream openInputStream() throws IOException {
+    // Use the original url if no resolved url exists, eg. if
+    // it's the first time a request is made.
+    final URLOpener opener =
+      (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
+
+    final HttpURLConnection connection = opener.openConnection(startPos);
+    connection.connect();
+    checkResponseCode(connection);
+
+    final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
+    if (cl == null) {
+      throw new IOException(StreamFile.CONTENT_LENGTH+" header is missing");
+    }
+    final long streamlength = Long.parseLong(cl);
+    filelength = startPos + streamlength;
+    // Java has a bug with >2GB request streams.  It won't bounds check
+    // the reads so the transfer blocks until the server times out
+    InputStream is =
+        new BoundedInputStream(connection.getInputStream(), streamlength);
+
+    resolvedURL.setURL(getResolvedUrl(connection));
+    
+    return is;
+  }
+  
   private int update(final int n) throws IOException {
     if (n != -1) {
       currentPos += n;
@@ -150,17 +163,21 @@ public abstract class ByteRangeInputStre
    * The next read() will be from that location.  Can't
    * seek past the end of the file.
    */
+  @Override
   public void seek(long pos) throws IOException {
     if (pos != currentPos) {
       startPos = pos;
       currentPos = pos;
-      status = StreamStatus.SEEK;
+      if (status != StreamStatus.CLOSED) {
+        status = StreamStatus.SEEK;
+      }
     }
   }
 
   /**
    * Return the current offset from the start of the file
    */
+  @Override
   public long getPos() throws IOException {
     return currentPos;
   }
@@ -169,7 +186,17 @@ public abstract class ByteRangeInputStre
    * Seeks a different copy of the data.  Returns true if
    * found a new source, false otherwise.
    */
+  @Override
   public boolean seekToNewSource(long targetPos) throws IOException {
     return false;
   }
-}
\ No newline at end of file
+  
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+    status = StreamStatus.CLOSED;
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java?rev=1331570&r1=1331569&r2=1331570&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
Fri Apr 27 20:13:17 2012
@@ -19,8 +19,10 @@ package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -169,4 +171,74 @@ public static class MockHttpURLConnectio
                    "HTTP_OK expected, received 206", e.getMessage());
     }
   }
+  
+  @Test
+  public void testPropagatedClose() throws IOException {
+    ByteRangeInputStream brs = spy(
+        new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
+    
+    InputStream mockStream = mock(InputStream.class);
+    doReturn(mockStream).when(brs).openInputStream();
+
+    int brisOpens = 0;
+    int brisCloses = 0;
+    int isCloses = 0;
+    
+    // first open, shouldn't close underlying stream
+    brs.getInputStream();
+    verify(brs, times(++brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+    
+    // stream is open, shouldn't close underlying stream
+    brs.getInputStream();
+    verify(brs, times(brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+    
+    // seek forces a reopen, should close underlying stream
+    brs.seek(1);
+    brs.getInputStream();
+    verify(brs, times(++brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(++isCloses)).close();
+
+    // verify that the underlying stream isn't closed after a seek
+    // ie. the state was correctly updated
+    brs.getInputStream();
+    verify(brs, times(brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+
+    // seeking to same location should be a no-op
+    brs.seek(1);
+    brs.getInputStream();
+    verify(brs, times(brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+
+    // close should of course close
+    brs.close();
+    verify(brs, times(++brisCloses)).close();
+    verify(mockStream, times(++isCloses)).close();
+    
+    // it's already closed, underlying stream should not close
+    brs.close();
+    verify(brs, times(++brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+    
+    // it's closed, don't reopen it
+    boolean errored = false;
+    try {
+      brs.getInputStream();
+    } catch (IOException e) {
+      errored = true;
+      assertEquals("Stream closed", e.getMessage());
+    } finally {
+      assertTrue("Read a closed steam", errored);
+    }
+    verify(brs, times(brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java?rev=1331570&r1=1331569&r2=1331570&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java
(original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java
Fri Apr 27 20:13:17 2012
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.net.URI;
 import java.net.URL;
@@ -234,6 +235,45 @@ public class TestHftpFileSystem {
     assertEquals('7', in.read());
   }
 
+  @Test
+  public void testReadClosedStream() throws IOException {
+    final Path testFile = new Path("/testfile+2");
+    FSDataOutputStream os = hdfs.create(testFile, true);
+    os.writeBytes("0123456789");
+    os.close();
+
+    // ByteRangeInputStream delays opens until reads.  Make sure it doesn't
+    // open a closed stream that has never been opened
+    FSDataInputStream in = hftpFs.open(testFile);
+    in.close();
+    checkClosedStream(in);
+    checkClosedStream(in.getWrappedStream());
+    
+    // force the stream to connect and then close it
+    in = hftpFs.open(testFile);
+    int ch = in.read(); 
+    assertEquals('0', ch);
+    in.close();
+    checkClosedStream(in);
+    checkClosedStream(in.getWrappedStream());
+    
+    // make sure seeking doesn't automagically reopen the stream
+    in.seek(4);
+    checkClosedStream(in);
+    checkClosedStream(in.getWrappedStream());
+  }
+  
+  private void checkClosedStream(InputStream is) {
+    IOException ioe = null;
+    try {
+      is.read();
+    } catch (IOException e) {
+      ioe = e;
+    }
+    assertNotNull("No exception on closed read", ioe);
+    assertEquals("Stream closed", ioe.getMessage());
+  }
+  
   public void resetFileSystem() throws IOException {
     // filesystem caching has a quirk/bug that it caches based on the user's
     // given uri.  the result is if a filesystem is instantiated with no port,



Mime
View raw message