Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 04800200CF8 for ; Thu, 31 Aug 2017 00:47:25 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 03278167C48; Wed, 30 Aug 2017 22:47:25 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EFD76167C50 for ; Thu, 31 Aug 2017 00:47:23 +0200 (CEST) Received: (qmail 35240 invoked by uid 500); 30 Aug 2017 22:47:22 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 35228 invoked by uid 99); 30 Aug 2017 22:47:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 30 Aug 2017 22:47:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A2D57F16B2; Wed, 30 Aug 2017 22:47:22 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: shv@apache.org To: common-commits@hadoop.apache.org Message-Id: <5c0349efad7248dc8df66adbc29363e2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-8797. WebHdfsFileSystem creates too many connections for pread. Contributed by Jing Zhao. Date: Wed, 30 Aug 2017 22:47:22 +0000 (UTC) archived-at: Wed, 30 Aug 2017 22:47:25 -0000 Repository: hadoop Updated Branches: refs/heads/branch-2.7 abd741a3f -> 70df729a1 HDFS-8797. WebHdfsFileSystem creates too many connections for pread. Contributed by Jing Zhao. (cherry picked from commit e91ccfad07ec5b5674a84009772dd31a82b4e4de) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/70df729a Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/70df729a Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/70df729a Branch: refs/heads/branch-2.7 Commit: 70df729a1e5d47c1c21e73ef303a8ed85f289a85 Parents: abd741a Author: Jing Zhao Authored: Wed Jul 22 17:42:31 2015 -0700 Committer: Konstantin V Shvachko Committed: Wed Aug 30 15:43:34 2017 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hadoop/hdfs/web/ByteRangeInputStream.java | 57 +++++++++++++++++--- .../hdfs/web/TestByteRangeInputStream.java | 35 ++++++------ .../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 39 ++++++++++++++ 4 files changed, 111 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/70df729a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1fdb100..845a964 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -10,6 +10,8 @@ Release 2.7.5 - UNRELEASED HDFS-9153. Pretty-format the output for DFSIO. (Kai Zheng via wheat9) + HDFS-8797. WebHdfsFileSystem creates too many connections for pread. (jing9) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/70df729a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java index 9e3b29a..8e21b77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.web; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; @@ -66,6 +67,16 @@ public abstract class ByteRangeInputStream extends FSInputStream { final boolean resolved) throws IOException; } + static class InputStreamAndFileLength { + final Long length; + final InputStream in; + + InputStreamAndFileLength(Long length, InputStream in) { + this.length = length; + this.in = in; + } + } + enum StreamStatus { NORMAL, SEEK, CLOSED } @@ -102,7 +113,9 @@ public abstract class ByteRangeInputStream extends FSInputStream { if (in != null) { in.close(); } - in = openInputStream(); + InputStreamAndFileLength fin = openInputStream(startPos); + in = fin.in; + fileLength = fin.length; status = StreamStatus.NORMAL; break; case CLOSED: @@ -112,31 +125,33 @@ public abstract class ByteRangeInputStream extends FSInputStream { } @VisibleForTesting - protected InputStream openInputStream() throws IOException { + protected InputStreamAndFileLength openInputStream(long startOffset) + throws IOException { // Use the original url if no resolved url exists, eg. if // it's the first time a request is made. final boolean resolved = resolvedURL.getURL() != null; final URLOpener opener = resolved? resolvedURL: originalURL; - final HttpURLConnection connection = opener.connect(startPos, resolved); + final HttpURLConnection connection = opener.connect(startOffset, resolved); resolvedURL.setURL(getResolvedUrl(connection)); InputStream in = connection.getInputStream(); + final Long length; final Map> headers = connection.getHeaderFields(); if (isChunkedTransferEncoding(headers)) { // file length is not known - fileLength = null; + length = null; } else { // for non-chunked transfer-encoding, get content-length long streamlength = getStreamLength(connection, headers); - fileLength = startPos + streamlength; + length = startOffset + streamlength; // Java has a bug with >2GB request streams. It won't bounds check // the reads so the transfer blocks until the server times out in = new BoundedInputStream(in, streamlength); } - return in; + return new InputStreamAndFileLength(length, in); } private static long getStreamLength(HttpURLConnection connection, @@ -230,6 +245,36 @@ public abstract class ByteRangeInputStream extends FSInputStream { } } + @Override + public int read(long position, byte[] buffer, int offset, int length) + throws IOException { + try (InputStream in = openInputStream(position).in) { + return in.read(buffer, offset, length); + } + } + + @Override + public void readFully(long position, byte[] buffer, int offset, int length) + throws IOException { + final InputStreamAndFileLength fin = openInputStream(position); + if (fin.length != null && length + position > fin.length) { + throw new EOFException("The length to read " + length + + " exceeds the file length " + fin.length); + } + try { + int nread = 0; + while (nread < length) { + int nbytes = fin.in.read(buffer, offset + nread, length - nread); + if (nbytes < 0) { + throw new EOFException("End of file reached before reading fully."); + } + nread += nbytes; + } + } finally { + fin.in.close(); + } + } + /** * Return the current offset from the start of the file */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/70df729a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java index 11deab8..40f2b9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java @@ -35,7 +35,9 @@ import java.net.HttpURLConnection; import java.net.URL; import com.google.common.net.HttpHeaders; +import org.apache.hadoop.hdfs.web.ByteRangeInputStream.InputStreamAndFileLength; import org.junit.Test; +import org.mockito.Mockito; import org.mockito.internal.util.reflection.Whitebox; public class TestByteRangeInputStream { @@ -140,8 +142,9 @@ public class TestByteRangeInputStream { public void testPropagatedClose() throws IOException { ByteRangeInputStream bris = mock(ByteRangeInputStream.class, CALLS_REAL_METHODS); - InputStream mockStream = mock(InputStream.class); - doReturn(mockStream).when(bris).openInputStream(); + InputStreamAndFileLength mockStream = new InputStreamAndFileLength(1L, + mock(InputStream.class)); + doReturn(mockStream).when(bris).openInputStream(Mockito.anyLong()); Whitebox.setInternalState(bris, "status", ByteRangeInputStream.StreamStatus.SEEK); @@ -151,46 +154,46 @@ public class TestByteRangeInputStream { // first open, shouldn't close underlying stream bris.getInputStream(); - verify(bris, times(++brisOpens)).openInputStream(); + verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong()); verify(bris, times(brisCloses)).close(); - verify(mockStream, times(isCloses)).close(); + verify(mockStream.in, times(isCloses)).close(); // stream is open, shouldn't close underlying stream bris.getInputStream(); - verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong()); verify(bris, times(brisCloses)).close(); - verify(mockStream, times(isCloses)).close(); + verify(mockStream.in, times(isCloses)).close(); // seek forces a reopen, should close underlying stream bris.seek(1); bris.getInputStream(); - verify(bris, times(++brisOpens)).openInputStream(); + verify(bris, times(++brisOpens)).openInputStream(Mockito.anyLong()); verify(bris, times(brisCloses)).close(); - verify(mockStream, times(++isCloses)).close(); + verify(mockStream.in, times(++isCloses)).close(); // verify that the underlying stream isn't closed after a seek // ie. the state was correctly updated bris.getInputStream(); - verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong()); verify(bris, times(brisCloses)).close(); - verify(mockStream, times(isCloses)).close(); + verify(mockStream.in, times(isCloses)).close(); // seeking to same location should be a no-op bris.seek(1); bris.getInputStream(); - verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong()); verify(bris, times(brisCloses)).close(); - verify(mockStream, times(isCloses)).close(); + verify(mockStream.in, times(isCloses)).close(); // close should of course close bris.close(); verify(bris, times(++brisCloses)).close(); - verify(mockStream, times(++isCloses)).close(); + verify(mockStream.in, times(++isCloses)).close(); // it's already closed, underlying stream should not close bris.close(); verify(bris, times(++brisCloses)).close(); - verify(mockStream, times(isCloses)).close(); + verify(mockStream.in, times(isCloses)).close(); // it's closed, don't reopen it boolean errored = false; @@ -202,9 +205,9 @@ public class TestByteRangeInputStream { } finally { assertTrue("Read a closed steam", errored); } - verify(bris, times(brisOpens)).openInputStream(); + verify(bris, times(brisOpens)).openInputStream(Mockito.anyLong()); verify(bris, times(brisCloses)).close(); - verify(mockStream, times(isCloses)).close(); + verify(mockStream.in, times(isCloses)).close(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/70df729a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java index f679f14..a87f4c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHDFS.java @@ -591,6 +591,45 @@ public class TestWebHDFS { } } + @Test + public void testWebHdfsPread() throws Exception { + final Configuration conf = WebHdfsTestUtil.createConf(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .build(); + byte[] content = new byte[1024]; + RANDOM.nextBytes(content); + final Path foo = new Path("/foo"); + FSDataInputStream in = null; + try { + final WebHdfsFileSystem fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf, + WebHdfsFileSystem.SCHEME); + try (OutputStream os = fs.create(foo)) { + os.write(content); + } + + // pread + in = fs.open(foo, 1024); + byte[] buf = new byte[1024]; + try { + in.readFully(1020, buf, 0, 5); + Assert.fail("EOF expected"); + } catch (EOFException ignored) {} + + // mix pread with stateful read + int length = in.read(buf, 0, 512); + in.readFully(100, new byte[1024], 0, 100); + int preadLen = in.read(200, new byte[1024], 0, 200); + Assert.assertTrue(preadLen > 0); + IOUtils.readFully(in, buf, length, 1024 - length); + Assert.assertArrayEquals(content, buf); + } finally { + if (in != null) { + in.close(); + } + cluster.shutdown(); + } + } + @Test(timeout=90000) public void testWebHdfsReadRetries() throws Exception { // ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org